Skip to main content

agentkit_loop/
lib.rs

1//! Runtime-agnostic agent loop orchestration for sessions, turns, tools, and interrupts.
2//!
3//! `agentkit-loop` is the central coordination layer in the agentkit workspace.  It
4//! drives a model through a multi-turn agentic loop, executing tool calls,
5//! respecting permission checks, surfacing approval and authentication interrupts
6//! to the host application, and optionally compacting the transcript when it grows
7//! too large.
8//!
9//! # Architecture
10//!
11//! The main entry point is [`Agent`], constructed via [`AgentBuilder`].  After
12//! calling [`Agent::start`] you receive a [`LoopDriver`] that yields
13//! [`LoopStep`]s -- either a finished turn or an interrupt that requires host
14//! resolution before the loop can continue.
15//!
16//! ```text
17//! Agent::builder()
18//!     .model(adapter)        // ModelAdapter implementation
19//!     .tools(registry)       // ToolRegistry with registered tools
20//!     .permissions(checker)  // PermissionChecker for gating tool use
21//!     .observer(obs)         // LoopObserver for streaming events
22//!     .build()?
23//!     .start(config).await?  -> LoopDriver
24//!         .submit_input(...)
25//!         .next().await?     -> LoopStep::Finished | LoopStep::Interrupt
26//! ```
27//!
28//! # Example
29//!
30//! ```rust,no_run
31//! use agentkit_loop::{Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig};
32//!
33//! # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
34//! let agent = Agent::builder()
35//!     .model(adapter)
36//!     .build()?;
37//!
38//! let mut driver = agent
39//!     .start(
40//!         SessionConfig::new("demo").with_cache(
41//!             PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
42//!         ),
43//!     )
44//!     .await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use std::collections::{BTreeMap, VecDeque};
50use std::sync::Arc;
51
52use agentkit_compaction::{
53    CompactionConfig, CompactionContext, CompactionReason, CompactionResult,
54};
55use agentkit_core::{
56    CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
57    TextPart, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation, Usage,
58};
59use agentkit_task_manager::{
60    PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskAuth, TaskLaunchRequest, TaskManager,
61    TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
62};
63#[cfg(test)]
64use agentkit_tools_core::ToolContext;
65use agentkit_tools_core::{
66    ApprovalDecision, ApprovalRequest, AuthOperation, AuthRequest, AuthResolution,
67    BasicToolExecutor, OwnedToolContext, PermissionChecker, ToolError, ToolExecutor, ToolRegistry,
68    ToolRequest, ToolResources, ToolSpec,
69};
70use async_trait::async_trait;
71use serde::{Deserialize, Serialize};
72use thiserror::Error;
73
74const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
75const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
76const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
77const USER_CANCELLED_REASON: &str = "user_cancelled";
78
79/// Configuration required to start a new model session.
80///
81/// Pass this to [`Agent::start`] to initialise the underlying [`ModelSession`]
82/// and obtain a [`LoopDriver`].
83///
84/// # Example
85///
86/// ```rust
87/// use agentkit_loop::{PromptCacheRequest, PromptCacheRetention, SessionConfig};
88///
89/// let config = SessionConfig::new("my-session").with_cache(
90///     PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
91/// );
92/// ```
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct SessionConfig {
95    /// Unique identifier for the session.
96    pub session_id: SessionId,
97    /// Arbitrary key-value metadata forwarded to the model adapter.
98    pub metadata: MetadataMap,
99    /// Default provider-side prompt caching policy for turns in this session.
100    pub cache: Option<PromptCacheRequest>,
101}
102
103impl SessionConfig {
104    /// Builds a session config with empty metadata and no cache policy.
105    pub fn new(session_id: impl Into<SessionId>) -> Self {
106        Self {
107            session_id: session_id.into(),
108            metadata: MetadataMap::new(),
109            cache: None,
110        }
111    }
112
113    /// Replaces the session metadata map.
114    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
115        self.metadata = metadata;
116        self
117    }
118
119    /// Sets the default prompt cache request for the session.
120    pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
121        self.cache = Some(cache);
122        self
123    }
124
125    /// Clears any default prompt cache request for the session.
126    pub fn without_cache(mut self) -> Self {
127        self.cache = None;
128        self
129    }
130}
131
132/// Strength of a prompt-cache request.
133///
134/// `BestEffort` lets adapters ignore unsupported controls while still using
135/// any provider-native automatic caching they support. `Required` upgrades
136/// unsupported cache requests into provider errors.
137#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub enum PromptCacheMode {
139    /// Disable prompt caching for this request.
140    Disabled,
141    /// Use caching when the provider can honor the request.
142    #[default]
143    BestEffort,
144    /// Fail the turn if the provider cannot honor the request.
145    Required,
146}
147
148/// High-level provider-neutral cache retention hint.
149///
150/// Providers map this to their native controls. For example, OpenAI maps
151/// `Short` to in-memory retention while OpenRouter Anthropic models map it to
152/// the default 5-minute ephemeral cache.
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
154pub enum PromptCacheRetention {
155    /// Use the provider's default cache retention.
156    Default,
157    /// Prefer the provider's short-lived cache retention mode.
158    Short,
159    /// Prefer the provider's longest generally available cache retention mode.
160    Extended,
161}
162
163/// Provider-neutral prompt caching strategy.
164#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
165pub enum PromptCacheStrategy {
166    /// Let the provider decide the cacheable prefix automatically.
167    #[default]
168    Automatic,
169    /// Apply explicit cache breakpoints to selected prefix boundaries.
170    Explicit {
171        /// Cache breakpoints in transcript/tool order.
172        breakpoints: Vec<PromptCacheBreakpoint>,
173    },
174}
175
176impl PromptCacheStrategy {
177    /// Uses the provider's native automatic cache behavior when available, or
178    /// any adapter-provided automatic planning fallback.
179    pub fn automatic() -> Self {
180        Self::Automatic
181    }
182
183    /// Uses explicit cache breakpoints.
184    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
185        Self::Explicit {
186            breakpoints: breakpoints.into_iter().collect(),
187        }
188    }
189}
190
191/// Prefix boundary that a provider should cache when using explicit caching.
192#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum PromptCacheBreakpoint {
194    /// Cache the tool schema prefix through the last available tool.
195    ToolsEnd,
196    /// Cache through the end of the transcript item at `index`.
197    TranscriptItemEnd { index: usize },
198    /// Cache through the specific transcript part.
199    ///
200    /// Not every adapter can target every part precisely; unsupported
201    /// fine-grained breakpoints become best-effort no-ops unless the request is
202    /// marked [`PromptCacheMode::Required`].
203    TranscriptPartEnd {
204        item_index: usize,
205        part_index: usize,
206    },
207}
208
209impl PromptCacheBreakpoint {
210    /// Cache the tool schema prefix through the last available tool.
211    pub fn tools_end() -> Self {
212        Self::ToolsEnd
213    }
214
215    /// Cache through the end of a transcript item.
216    pub fn transcript_item_end(index: usize) -> Self {
217        Self::TranscriptItemEnd { index }
218    }
219
220    /// Cache through a specific part within a transcript item.
221    pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
222        Self::TranscriptPartEnd {
223            item_index,
224            part_index,
225        }
226    }
227}
228
229/// Prompt caching request sent alongside a turn.
230#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
231pub struct PromptCacheRequest {
232    /// Strength of the caching request.
233    pub mode: PromptCacheMode,
234    /// Automatic or explicit caching strategy.
235    pub strategy: PromptCacheStrategy,
236    /// Optional provider-neutral retention hint.
237    pub retention: Option<PromptCacheRetention>,
238    /// Optional provider cache key or routing key.
239    pub key: Option<String>,
240}
241
242impl PromptCacheRequest {
243    /// Builds a best-effort automatic cache request.
244    pub fn automatic() -> Self {
245        Self::best_effort(PromptCacheStrategy::automatic())
246    }
247
248    /// Builds a required automatic cache request.
249    pub fn automatic_required() -> Self {
250        Self::required(PromptCacheStrategy::automatic())
251    }
252
253    /// Builds a best-effort explicit cache request.
254    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
255        Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
256    }
257
258    /// Builds a required explicit cache request.
259    pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
260        Self::required(PromptCacheStrategy::explicit(breakpoints))
261    }
262
263    /// Builds a disabled cache request.
264    pub fn disabled() -> Self {
265        Self {
266            mode: PromptCacheMode::Disabled,
267            strategy: PromptCacheStrategy::Automatic,
268            retention: None,
269            key: None,
270        }
271    }
272
273    /// Builds a best-effort cache request with the given strategy.
274    pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
275        Self {
276            mode: PromptCacheMode::BestEffort,
277            strategy,
278            retention: None,
279            key: None,
280        }
281    }
282
283    /// Builds a required cache request with the given strategy.
284    pub fn required(strategy: PromptCacheStrategy) -> Self {
285        Self {
286            mode: PromptCacheMode::Required,
287            strategy,
288            retention: None,
289            key: None,
290        }
291    }
292
293    /// Overrides the request mode.
294    pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
295        self.mode = mode;
296        self
297    }
298
299    /// Overrides the request strategy.
300    pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
301        self.strategy = strategy;
302        self
303    }
304
305    /// Applies a provider-neutral retention hint.
306    pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
307        self.retention = Some(retention);
308        self
309    }
310
311    /// Applies a provider cache key or routing key.
312    pub fn with_key(mut self, key: impl Into<String>) -> Self {
313        self.key = Some(key.into());
314        self
315    }
316
317    /// Clears any provider-neutral retention hint.
318    pub fn without_retention(mut self) -> Self {
319        self.retention = None;
320        self
321    }
322
323    /// Clears any provider cache key or routing key.
324    pub fn without_key(mut self) -> Self {
325        self.key = None;
326        self
327    }
328
329    /// Returns true when caching should be active for this request.
330    pub fn is_enabled(&self) -> bool {
331        !matches!(self.mode, PromptCacheMode::Disabled)
332    }
333}
334
335/// Payload sent to the model at the start of each turn.
336///
337/// The [`LoopDriver`] constructs this automatically from its internal state
338/// and passes it to [`ModelSession::begin_turn`].  Model adapter authors
339/// use the fields to build the provider-specific request.
340#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
341pub struct TurnRequest {
342    /// Session this turn belongs to.
343    pub session_id: SessionId,
344    /// Unique identifier for the current turn.
345    pub turn_id: agentkit_core::TurnId,
346    /// Full conversation transcript accumulated so far.
347    pub transcript: Vec<Item>,
348    /// Tool specifications the model may invoke during this turn.
349    pub available_tools: Vec<ToolSpec>,
350    /// Provider-side prompt caching request for this turn.
351    pub cache: Option<PromptCacheRequest>,
352    /// Per-turn metadata (e.g. provider hints).
353    pub metadata: MetadataMap,
354}
355
356/// Final result produced by a single model turn.
357///
358/// Returned inside [`ModelTurnEvent::Finished`] to signal that the model has
359/// completed its generation for this turn.
360#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
361pub struct ModelTurnResult {
362    /// Why the model stopped generating (e.g. completed, tool call, length).
363    pub finish_reason: FinishReason,
364    /// Items the model produced during this turn (text, tool calls, etc.).
365    pub output_items: Vec<Item>,
366    /// Token usage statistics, if available.
367    pub usage: Option<Usage>,
368    /// Provider-specific metadata about the turn.
369    pub metadata: MetadataMap,
370}
371
372/// Streaming event emitted by a [`ModelTurn`] during generation.
373///
374/// The [`LoopDriver`] consumes these events one-by-one via
375/// [`ModelTurn::next_event`] and translates them into [`AgentEvent`]s for
376/// observers and into transcript mutations.
377#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
378pub enum ModelTurnEvent {
379    /// Incremental text or content delta from the model.
380    Delta(Delta),
381    /// The model is requesting a tool call.
382    ToolCall(ToolCallPart),
383    /// Updated token usage statistics.
384    Usage(Usage),
385    /// The model has finished generating for this turn.
386    Finished(ModelTurnResult),
387}
388
389/// Factory for creating model sessions.
390///
391/// Implement this trait to integrate a model provider (e.g. OpenRouter,
392/// Anthropic, a local LLM server) with the agent loop.  [`Agent`] holds a
393/// single adapter and calls [`start_session`](ModelAdapter::start_session)
394/// once when [`Agent::start`] is invoked.
395///
396/// # Example
397///
398/// ```rust,no_run
399/// use agentkit_loop::{ModelAdapter, ModelSession, SessionConfig, LoopError};
400/// use async_trait::async_trait;
401///
402/// struct MyAdapter;
403///
404/// #[async_trait]
405/// impl ModelAdapter for MyAdapter {
406///     type Session = MySession;
407///
408///     async fn start_session(&self, config: SessionConfig) -> Result<MySession, LoopError> {
409///         // Initialize provider-specific session state here.
410///         Ok(MySession { /* ... */ })
411///     }
412/// }
413/// # struct MySession;
414/// # #[async_trait]
415/// # impl ModelSession for MySession {
416/// #     type Turn = MyTurn;
417/// #     async fn begin_turn(&mut self, _r: agentkit_loop::TurnRequest, _c: Option<agentkit_core::TurnCancellation>) -> Result<MyTurn, LoopError> { todo!() }
418/// # }
419/// # struct MyTurn;
420/// # #[async_trait]
421/// # impl agentkit_loop::ModelTurn for MyTurn {
422/// #     async fn next_event(&mut self, _c: Option<agentkit_core::TurnCancellation>) -> Result<Option<agentkit_loop::ModelTurnEvent>, LoopError> { todo!() }
423/// # }
424/// ```
425#[async_trait]
426pub trait ModelAdapter: Send + Sync {
427    /// The session type produced by this adapter.
428    type Session: ModelSession;
429
430    /// Create a new model session from the given configuration.
431    ///
432    /// # Errors
433    ///
434    /// Returns [`LoopError`] if the provider connection or initialisation fails.
435    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
436}
437
438/// An active model session that can produce sequential turns.
439///
440/// A session is created once per [`Agent::start`] call and lives for the
441/// lifetime of the [`LoopDriver`].  Each call to [`begin_turn`](ModelSession::begin_turn)
442/// hands the full transcript to the model and returns a streaming
443/// [`ModelTurn`].
444#[async_trait]
445pub trait ModelSession: Send {
446    /// The turn type produced by this session.
447    type Turn: ModelTurn;
448
449    /// Start a new turn, sending the transcript and available tools to the model.
450    ///
451    /// # Arguments
452    ///
453    /// * `request` -- the turn payload including transcript and tool specs.
454    /// * `cancellation` -- optional handle the implementation should poll to
455    ///   detect user-initiated cancellation.
456    ///
457    /// # Errors
458    ///
459    /// Returns [`LoopError::Cancelled`] when the turn is cancelled, or a
460    /// provider-specific error wrapped in [`LoopError`].
461    async fn begin_turn(
462        &mut self,
463        request: TurnRequest,
464        cancellation: Option<TurnCancellation>,
465    ) -> Result<Self::Turn, LoopError>;
466}
467
468/// A streaming model turn that yields events one at a time.
469///
470/// The loop driver calls [`next_event`](ModelTurn::next_event) repeatedly
471/// until it returns `Ok(None)` (stream exhausted) or
472/// `Ok(Some(ModelTurnEvent::Finished(_)))`.
473#[async_trait]
474pub trait ModelTurn: Send {
475    /// Retrieve the next event from the model's response stream.
476    ///
477    /// Returns `Ok(None)` when the stream is exhausted.
478    ///
479    /// # Errors
480    ///
481    /// Returns [`LoopError::Cancelled`] if `cancellation` fires, or a
482    /// provider-specific error wrapped in [`LoopError`].
483    async fn next_event(
484        &mut self,
485        cancellation: Option<TurnCancellation>,
486    ) -> Result<Option<ModelTurnEvent>, LoopError>;
487}
488
489/// Observer hook for streaming agent events to the host application.
490///
491/// Register observers via [`AgentBuilder::observer`] to receive real-time
492/// notifications about deltas, tool calls, usage, warnings, and lifecycle
493/// events.
494///
495/// # Example
496///
497/// ```rust
498/// use agentkit_loop::{AgentEvent, LoopObserver};
499///
500/// struct StdoutObserver;
501///
502/// impl LoopObserver for StdoutObserver {
503///     fn handle_event(&mut self, event: AgentEvent) {
504///         println!("{event:?}");
505///     }
506/// }
507/// ```
508pub trait LoopObserver: Send {
509    /// Called synchronously for every [`AgentEvent`] emitted by the loop driver.
510    fn handle_event(&mut self, event: AgentEvent);
511}
512
513/// Lifecycle and streaming events emitted by the [`LoopDriver`].
514///
515/// Observers (see [`LoopObserver`]) receive these events in the order they
516/// occur.  They are useful for building UIs, logging, or telemetry.
517#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
518pub enum AgentEvent {
519    /// The agent run has been initialised.
520    RunStarted { session_id: SessionId },
521    /// A new model turn is starting.
522    TurnStarted {
523        session_id: SessionId,
524        turn_id: agentkit_core::TurnId,
525    },
526    /// User input has been accepted into the pending queue.
527    InputAccepted {
528        session_id: SessionId,
529        items: Vec<Item>,
530    },
531    /// Incremental content delta from the model.
532    ContentDelta(Delta),
533    /// The model has requested a tool call.
534    ToolCallRequested(ToolCallPart),
535    /// A tool call requires explicit user approval before execution.
536    ApprovalRequired(ApprovalRequest),
537    /// A tool call requires authentication before execution.
538    AuthRequired(AuthRequest),
539    /// An approval interrupt has been resolved.
540    ApprovalResolved { approved: bool },
541    /// An authentication interrupt has been resolved.
542    AuthResolved { provided: bool },
543    /// Transcript compaction is about to begin.
544    CompactionStarted {
545        session_id: SessionId,
546        turn_id: Option<agentkit_core::TurnId>,
547        reason: CompactionReason,
548    },
549    /// Transcript compaction has finished.
550    CompactionFinished {
551        session_id: SessionId,
552        turn_id: Option<agentkit_core::TurnId>,
553        replaced_items: usize,
554        transcript_len: usize,
555        metadata: MetadataMap,
556    },
557    /// Updated token usage statistics.
558    UsageUpdated(Usage),
559    /// Non-fatal warning (e.g. a tool failure that was recovered from).
560    Warning { message: String },
561    /// The agent run has failed with an unrecoverable error.
562    RunFailed { message: String },
563    /// A turn has finished (successfully, via cancellation, etc.).
564    TurnFinished(TurnResult),
565}
566
567/// Handle for a pending approval interrupt.
568///
569/// Wraps an [`ApprovalRequest`] and provides ergonomic resolution methods
570/// so callers can resolve the interrupt directly instead of searching for
571/// the matching method on [`LoopDriver`].
572///
573/// # Example
574///
575/// ```rust,no_run
576/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
577/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
578/// match driver.next().await? {
579///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
580///         println!("Needs approval: {}", pending.request.summary);
581///         pending.approve(driver)?;
582///     }
583///     _ => {}
584/// }
585/// # Ok(())
586/// # }
587/// ```
588#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
589pub struct PendingApproval {
590    /// The underlying approval request details.
591    pub request: ApprovalRequest,
592}
593
594impl std::ops::Deref for PendingApproval {
595    type Target = ApprovalRequest;
596    fn deref(&self) -> &ApprovalRequest {
597        &self.request
598    }
599}
600
601impl PendingApproval {
602    /// Approve the pending tool call.
603    pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
604        let call_id = self
605            .request
606            .call_id
607            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
608        driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
609    }
610
611    /// Deny the pending tool call.
612    pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
613        let call_id = self
614            .request
615            .call_id
616            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
617        driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
618    }
619
620    /// Deny the pending tool call with a reason.
621    pub fn deny_with_reason<S: ModelSession>(
622        self,
623        driver: &mut LoopDriver<S>,
624        reason: impl Into<String>,
625    ) -> Result<(), LoopError> {
626        let call_id = self
627            .request
628            .call_id
629            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
630        driver.resolve_approval_for(
631            call_id,
632            ApprovalDecision::Deny {
633                reason: Some(reason.into()),
634            },
635        )
636    }
637}
638
639/// Handle for a pending authentication interrupt.
640///
641/// Wraps an [`AuthRequest`] and provides ergonomic resolution methods.
642///
643/// # Example
644///
645/// ```rust,no_run
646/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
647/// # use agentkit_core::MetadataMap;
648/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
649/// match driver.next().await? {
650///     LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
651///         println!("Auth required from: {}", pending.request.provider);
652///         pending.cancel(driver)?;
653///     }
654///     _ => {}
655/// }
656/// # Ok(())
657/// # }
658/// ```
659#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
660pub struct PendingAuth {
661    /// The underlying auth request details.
662    pub request: AuthRequest,
663}
664
665impl std::ops::Deref for PendingAuth {
666    type Target = AuthRequest;
667    fn deref(&self) -> &AuthRequest {
668        &self.request
669    }
670}
671
672impl PendingAuth {
673    /// Provide credentials to satisfy the auth request.
674    pub fn provide<S: ModelSession>(
675        self,
676        driver: &mut LoopDriver<S>,
677        credentials: MetadataMap,
678    ) -> Result<(), LoopError> {
679        driver.resolve_auth(AuthResolution::Provided {
680            request: self.request,
681            credentials,
682        })
683    }
684
685    /// Cancel the auth flow.
686    pub fn cancel<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
687        driver.resolve_auth(AuthResolution::Cancelled {
688            request: self.request,
689        })
690    }
691}
692
693/// Descriptor for a [`LoopInterrupt::AwaitingInput`] interrupt.
694///
695/// Returned when the driver has no pending input and needs the host to
696/// supply items before advancing.
697///
698/// # Example
699///
700/// ```rust,no_run
701/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
702/// # use agentkit_core::Item;
703/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>, items: Vec<Item>) -> Result<(), agentkit_loop::LoopError> {
704/// match driver.next().await? {
705///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
706///         pending.submit(driver, items)?;
707///     }
708///     _ => {}
709/// }
710/// # Ok(())
711/// # }
712/// ```
713#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
714pub struct InputRequest {
715    /// The session that is waiting for input.
716    pub session_id: SessionId,
717    /// Human-readable explanation of why input is needed.
718    pub reason: String,
719}
720
721impl InputRequest {
722    /// Submit input items to the driver.
723    pub fn submit<S: ModelSession>(
724        self,
725        driver: &mut LoopDriver<S>,
726        items: Vec<Item>,
727    ) -> Result<(), LoopError> {
728        driver.submit_input(items)
729    }
730}
731
732/// Outcome of a completed (or cancelled) turn.
733///
734/// Wrapped by [`LoopStep::Finished`] and also emitted as
735/// [`AgentEvent::TurnFinished`] to observers.
736#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
737pub struct TurnResult {
738    /// Identifier for the turn that produced this result.
739    pub turn_id: agentkit_core::TurnId,
740    /// Why the turn ended (completed, tool call, cancelled, etc.).
741    pub finish_reason: FinishReason,
742    /// Items produced during this turn (assistant text, tool results, etc.).
743    pub items: Vec<Item>,
744    /// Aggregated token usage, if reported by the model.
745    pub usage: Option<Usage>,
746    /// Additional metadata about the turn.
747    pub metadata: MetadataMap,
748}
749
750/// An interrupt that pauses the agent loop until the host resolves it.
751///
752/// The loop returns an interrupt inside [`LoopStep::Interrupt`] whenever it
753/// cannot proceed autonomously.  Each variant carries a handle with
754/// resolution methods so callers can resolve the interrupt directly.
755///
756/// # Example
757///
758/// ```rust,no_run
759/// use agentkit_loop::{LoopInterrupt, LoopStep};
760/// # use agentkit_loop::LoopDriver;
761///
762/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
763/// match driver.next().await? {
764///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
765///         println!("Tool {} needs approval: {}", pending.request.request_kind, pending.request.summary);
766///         pending.approve(driver)?;
767///     }
768///     LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
769///         println!("Auth required from provider: {}", pending.request.provider);
770///         pending.cancel(driver)?;
771///     }
772///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
773///         println!("Waiting for input: {}", pending.reason);
774///         // ... call pending.submit(driver, items)
775///     }
776///     LoopStep::Finished(result) => {
777///         println!("Turn finished: {:?}", result.finish_reason);
778///     }
779/// }
780/// # Ok(())
781/// # }
782/// ```
783#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
784pub enum LoopInterrupt {
785    /// A tool call requires explicit approval before it can execute.
786    ApprovalRequest(PendingApproval),
787    /// A tool call requires authentication credentials.
788    AuthRequest(PendingAuth),
789    /// The driver has no pending input and needs the host to supply some.
790    AwaitingInput(InputRequest),
791}
792
793/// The result of advancing the agent loop by one step.
794///
795/// Returned by [`LoopDriver::next`].  The host should pattern-match on this
796/// to decide whether to continue the loop or resolve an interrupt first.
797///
798/// # Example
799///
800/// ```rust,no_run
801/// use agentkit_loop::LoopStep;
802/// # use agentkit_loop::LoopDriver;
803///
804/// # async fn run<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
805/// loop {
806///     match driver.next().await? {
807///         LoopStep::Finished(result) => {
808///             println!("Turn complete: {:?}", result.finish_reason);
809///             break;
810///         }
811///         LoopStep::Interrupt(interrupt) => {
812///             // Resolve the interrupt, then continue the loop.
813///             # break;
814///         }
815///     }
816/// }
817/// # Ok(())
818/// # }
819/// ```
820#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
821pub enum LoopStep {
822    /// The loop is paused and requires host action.
823    Interrupt(LoopInterrupt),
824    /// A turn has completed (or been cancelled).
825    Finished(TurnResult),
826}
827
828/// A read-only snapshot of the loop driver's current state.
829///
830/// Obtained via [`LoopDriver::snapshot`].  Useful for persisting or
831/// inspecting the conversation transcript without holding a mutable
832/// reference to the driver.
833#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
834pub struct LoopSnapshot {
835    /// Session identifier.
836    pub session_id: SessionId,
837    /// The full transcript accumulated so far.
838    pub transcript: Vec<Item>,
839    /// Input items queued but not yet consumed by a turn.
840    pub pending_input: Vec<Item>,
841}
842
843#[derive(Clone, Debug)]
844struct PendingApprovalToolCall {
845    request: ApprovalRequest,
846    decision: Option<ApprovalDecision>,
847    surfaced: bool,
848    turn_id: agentkit_core::TurnId,
849    task_id: TaskId,
850    call: ToolCallPart,
851    tool_request: ToolRequest,
852}
853
854#[derive(Clone, Debug)]
855struct PendingAuthToolCall {
856    request: AuthRequest,
857    resolution: Option<AuthResolution>,
858    turn_id: agentkit_core::TurnId,
859    task_id: TaskId,
860    call: ToolCallPart,
861    tool_request: ToolRequest,
862}
863
864#[derive(Clone, Debug, Default)]
865struct ActiveToolRound {
866    turn_id: agentkit_core::TurnId,
867    pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
868    background_pending: bool,
869    foreground_progressed: bool,
870}
871
872/// A configured agent ready to start a session.
873///
874/// Build one with [`Agent::builder`], supplying at minimum a [`ModelAdapter`].
875/// Then call [`Agent::start`] with a [`SessionConfig`] to obtain a
876/// [`LoopDriver`] that drives the agentic loop.
877///
878/// # Example
879///
880/// ```rust,no_run
881/// use agentkit_loop::{
882///     Agent, LoopStep, PromptCacheRequest, PromptCacheRetention, SessionConfig,
883/// };
884/// use agentkit_tools_core::ToolRegistry;
885///
886/// # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
887/// let agent = Agent::builder()
888///     .model(adapter)
889///     .tools(ToolRegistry::new())
890///     .build()?;
891///
892/// let mut driver = agent
893///     .start(
894///         SessionConfig::new("s1").with_cache(
895///             PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
896///         ),
897///     )
898///     .await?;
899///
900/// // Submit input and advance
901/// # Ok(())
902/// # }
903/// ```
904pub struct Agent<M>
905where
906    M: ModelAdapter,
907{
908    model: M,
909    tools: ToolRegistry,
910    task_manager: Arc<dyn TaskManager>,
911    permissions: Arc<dyn PermissionChecker>,
912    resources: Arc<dyn ToolResources>,
913    cancellation: Option<CancellationHandle>,
914    compaction: Option<CompactionConfig>,
915    observers: Vec<Box<dyn LoopObserver>>,
916}
917
918impl<M> Agent<M>
919where
920    M: ModelAdapter,
921{
922    /// Create a new [`AgentBuilder`] for configuring this agent.
923    pub fn builder() -> AgentBuilder<M> {
924        AgentBuilder::default()
925    }
926
927    /// Consume the agent and start a new session, returning a [`LoopDriver`].
928    ///
929    /// This calls [`ModelAdapter::start_session`] and emits an
930    /// [`AgentEvent::RunStarted`] event to all registered observers.
931    ///
932    /// # Errors
933    ///
934    /// Returns [`LoopError`] if the model adapter fails to create a session.
935    pub async fn start(self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
936        let session_id = config.session_id.clone();
937        let default_cache = config.cache.clone();
938        let session = self.model.start_session(config).await?;
939        let tool_executor = Arc::new(BasicToolExecutor::new(self.tools.clone()));
940        let mut driver = LoopDriver {
941            session_id: session_id.clone(),
942            default_cache,
943            next_turn_cache: None,
944            session: Some(session),
945            tool_executor,
946            task_manager: self.task_manager,
947            permissions: self.permissions,
948            resources: self.resources,
949            cancellation: self.cancellation,
950            compaction: self.compaction,
951            observers: self.observers,
952            transcript: Vec::new(),
953            pending_input: Vec::new(),
954            pending_approvals: BTreeMap::new(),
955            pending_approval_order: VecDeque::new(),
956            pending_auth: None,
957            active_tool_round: None,
958            next_turn_index: 1,
959        };
960        driver.emit(AgentEvent::RunStarted { session_id });
961        Ok(driver)
962    }
963}
964
965/// Builder for constructing an [`Agent`].
966///
967/// Obtained via [`Agent::builder`].  The only required field is
968/// [`model`](AgentBuilder::model); all others have sensible defaults
969/// (no tools, allow-all permissions, no compaction, no observers).
970pub struct AgentBuilder<M>
971where
972    M: ModelAdapter,
973{
974    model: Option<M>,
975    tools: ToolRegistry,
976    task_manager: Option<Arc<dyn TaskManager>>,
977    permissions: Arc<dyn PermissionChecker>,
978    resources: Arc<dyn ToolResources>,
979    cancellation: Option<CancellationHandle>,
980    compaction: Option<CompactionConfig>,
981    observers: Vec<Box<dyn LoopObserver>>,
982}
983
984impl<M> Default for AgentBuilder<M>
985where
986    M: ModelAdapter,
987{
988    fn default() -> Self {
989        Self {
990            model: None,
991            tools: ToolRegistry::new(),
992            task_manager: None,
993            permissions: Arc::new(AllowAllPermissions),
994            resources: Arc::new(()),
995            cancellation: None,
996            compaction: None,
997            observers: Vec::new(),
998        }
999    }
1000}
1001
1002impl<M> AgentBuilder<M>
1003where
1004    M: ModelAdapter,
1005{
1006    /// Set the model adapter (required).
1007    pub fn model(mut self, model: M) -> Self {
1008        self.model = Some(model);
1009        self
1010    }
1011
1012    /// Set the tool registry.  Defaults to an empty [`ToolRegistry`].
1013    pub fn tools(mut self, tools: ToolRegistry) -> Self {
1014        self.tools = tools;
1015        self
1016    }
1017
1018    /// Set the task manager that schedules tool-call execution.
1019    ///
1020    /// Defaults to [`SimpleTaskManager`], which preserves the existing
1021    /// sequential request/response behavior.
1022    pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1023        self.task_manager = Some(Arc::new(manager));
1024        self
1025    }
1026
1027    /// Set the permission checker that gates tool execution.
1028    ///
1029    /// Defaults to allowing all tool calls without prompting.
1030    pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1031        self.permissions = Arc::new(permissions);
1032        self
1033    }
1034
1035    /// Set shared resources available to tool implementations.
1036    pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1037        self.resources = Arc::new(resources);
1038        self
1039    }
1040
1041    /// Attach a [`CancellationHandle`] for cooperative cancellation of turns.
1042    pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1043        self.cancellation = Some(handle);
1044        self
1045    }
1046
1047    /// Enable transcript compaction with the given configuration.
1048    ///
1049    /// When configured, the driver checks the compaction trigger before each
1050    /// turn and applies the compaction strategy if the transcript is too long.
1051    pub fn compaction(mut self, config: CompactionConfig) -> Self {
1052        self.compaction = Some(config);
1053        self
1054    }
1055
1056    /// Register a [`LoopObserver`] that receives [`AgentEvent`]s.
1057    ///
1058    /// Multiple observers may be registered; they are called in order.
1059    pub fn observer(mut self, observer: impl LoopObserver + 'static) -> Self {
1060        self.observers.push(Box::new(observer));
1061        self
1062    }
1063
1064    /// Consume the builder and produce an [`Agent`].
1065    ///
1066    /// # Errors
1067    ///
1068    /// Returns [`LoopError::InvalidState`] if no model adapter was provided.
1069    pub fn build(self) -> Result<Agent<M>, LoopError> {
1070        let model = self
1071            .model
1072            .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1073        Ok(Agent {
1074            model,
1075            tools: self.tools,
1076            task_manager: self
1077                .task_manager
1078                .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1079            permissions: self.permissions,
1080            resources: self.resources,
1081            cancellation: self.cancellation,
1082            compaction: self.compaction,
1083            observers: self.observers,
1084        })
1085    }
1086}
1087
1088/// The runtime driver that advances the agent loop step by step.
1089///
1090/// Obtained from [`Agent::start`].  The typical usage pattern is:
1091///
1092/// 1. Call [`submit_input`](LoopDriver::submit_input) to enqueue user messages.
1093/// 2. Call [`next`](LoopDriver::next) to run the next turn.
1094/// 3. Handle the returned [`LoopStep`]:
1095///    - [`LoopStep::Finished`] -- the turn completed, inspect the result.
1096///    - [`LoopStep::Interrupt`] -- resolve the interrupt and call `next` again.
1097///
1098/// # Example
1099///
1100/// ```rust,no_run
1101/// use agentkit_core::{Item, ItemKind};
1102/// use agentkit_loop::{LoopDriver, LoopStep};
1103///
1104/// # async fn drive<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1105/// driver.submit_input(vec![Item::text(ItemKind::User, "Hello!")])?;
1106///
1107/// let step = driver.next().await?;
1108/// match step {
1109///     LoopStep::Finished(result) => println!("Done: {:?}", result.finish_reason),
1110///     LoopStep::Interrupt(interrupt) => {
1111///         // Resolve the interrupt (approval, auth, or input), then call next() again.
1112///         println!("Interrupted: {interrupt:?}");
1113///     }
1114/// }
1115/// # Ok(())
1116/// # }
1117/// ```
1118pub struct LoopDriver<S>
1119where
1120    S: ModelSession,
1121{
1122    session_id: SessionId,
1123    default_cache: Option<PromptCacheRequest>,
1124    next_turn_cache: Option<PromptCacheRequest>,
1125    session: Option<S>,
1126    tool_executor: Arc<dyn ToolExecutor>,
1127    task_manager: Arc<dyn TaskManager>,
1128    permissions: Arc<dyn PermissionChecker>,
1129    resources: Arc<dyn ToolResources>,
1130    cancellation: Option<CancellationHandle>,
1131    compaction: Option<CompactionConfig>,
1132    observers: Vec<Box<dyn LoopObserver>>,
1133    transcript: Vec<Item>,
1134    pending_input: Vec<Item>,
1135    pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1136    pending_approval_order: VecDeque<ToolCallId>,
1137    pending_auth: Option<PendingAuthToolCall>,
1138    active_tool_round: Option<ActiveToolRound>,
1139    next_turn_index: u64,
1140}
1141
1142impl<S> LoopDriver<S>
1143where
1144    S: ModelSession,
1145{
1146    fn start_task_via_manager(
1147        &self,
1148        task_id: Option<TaskId>,
1149        tool_request: ToolRequest,
1150        approved_request: Option<ApprovalRequest>,
1151        cancellation: Option<TurnCancellation>,
1152    ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1153    {
1154        let task_manager = self.task_manager.clone();
1155        let tool_executor = self.tool_executor.clone();
1156        let permissions = self.permissions.clone();
1157        let resources = self.resources.clone();
1158        let session_id = self.session_id.clone();
1159        let turn_id = tool_request.turn_id.clone();
1160        let metadata = tool_request.metadata.clone();
1161
1162        async move {
1163            task_manager
1164                .start_task(
1165                    TaskLaunchRequest {
1166                        task_id,
1167                        request: tool_request.clone(),
1168                        approved_request,
1169                    },
1170                    TaskStartContext {
1171                        executor: tool_executor,
1172                        tool_context: OwnedToolContext {
1173                            session_id,
1174                            turn_id,
1175                            metadata,
1176                            permissions,
1177                            resources,
1178                            cancellation,
1179                        },
1180                    },
1181                )
1182                .await
1183                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1184        }
1185    }
1186
1187    fn has_pending_interrupts(&self) -> bool {
1188        self.pending_auth.is_some() || !self.pending_approvals.is_empty()
1189    }
1190
1191    fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1192        let call_id = task.tool_request.call_id.clone();
1193        let call = ToolCallPart {
1194            id: call_id.clone(),
1195            name: task.tool_request.tool_name.to_string(),
1196            input: task.tool_request.input.clone(),
1197            metadata: task.tool_request.metadata.clone(),
1198        };
1199        let mut request = task.approval;
1200        request.call_id = Some(call_id.clone());
1201        let pending = PendingApprovalToolCall {
1202            request: request.clone(),
1203            decision: None,
1204            surfaced: false,
1205            turn_id: turn_id.clone(),
1206            task_id: task.task_id,
1207            call,
1208            tool_request: task.tool_request,
1209        };
1210        self.pending_approvals.insert(call_id.clone(), pending);
1211        if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1212            self.pending_approval_order.push_back(call_id);
1213        }
1214        self.emit(AgentEvent::ApprovalRequired(request));
1215    }
1216
1217    fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1218        for call_id in self.pending_approval_order.clone() {
1219            let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1220                continue;
1221            };
1222            if pending.decision.is_none() && !pending.surfaced {
1223                pending.surfaced = true;
1224                return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1225                    PendingApproval {
1226                        request: pending.request.clone(),
1227                    },
1228                )));
1229            }
1230        }
1231        None
1232    }
1233
1234    fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1235        self.pending_approval_order.iter().find_map(|call_id| {
1236            self.pending_approvals.get(call_id).and_then(|pending| {
1237                pending.decision.is_none().then(|| {
1238                    LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1239                        request: pending.request.clone(),
1240                    }))
1241                })
1242            })
1243        })
1244    }
1245
1246    fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1247        let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1248            self.pending_approvals
1249                .get(call_id)
1250                .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1251        })?;
1252        self.pending_approval_order.retain(|id| id != &call_id);
1253        self.pending_approvals.remove(&call_id)
1254    }
1255
1256    fn pending_auth_interrupt(&self) -> Option<LoopStep> {
1257        self.pending_auth.as_ref().and_then(|pending| {
1258            pending.resolution.is_none().then(|| {
1259                LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth {
1260                    request: pending.request.clone(),
1261                }))
1262            })
1263        })
1264    }
1265
1266    fn queue_auth_interrupt(
1267        &mut self,
1268        turn_id: &agentkit_core::TurnId,
1269        task: TaskAuth,
1270    ) -> LoopStep {
1271        let call = ToolCallPart {
1272            id: task.tool_request.call_id.clone(),
1273            name: task.tool_request.tool_name.to_string(),
1274            input: task.tool_request.input.clone(),
1275            metadata: task.tool_request.metadata.clone(),
1276        };
1277        let request = upgrade_auth_request(task.auth, &task.tool_request, &call);
1278        self.pending_auth = Some(PendingAuthToolCall {
1279            request: request.clone(),
1280            resolution: None,
1281            turn_id: turn_id.clone(),
1282            task_id: task.task_id,
1283            call,
1284            tool_request: task.tool_request,
1285        });
1286        self.emit(AgentEvent::AuthRequired(request.clone()));
1287        LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth { request }))
1288    }
1289
1290    fn queue_resolution_interrupt(
1291        &mut self,
1292        turn_id: &agentkit_core::TurnId,
1293        resolution: TaskResolution,
1294    ) -> Option<LoopStep> {
1295        match resolution {
1296            TaskResolution::Item(item) => {
1297                self.transcript.push(item);
1298                None
1299            }
1300            TaskResolution::Approval(task) => {
1301                self.enqueue_pending_approval(turn_id, task);
1302                self.take_next_unsurfaced_approval_interrupt()
1303            }
1304            TaskResolution::Auth(task) => Some(self.queue_auth_interrupt(turn_id, task)),
1305        }
1306    }
1307
1308    async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1309        let PendingLoopUpdates { mut resolutions } = self
1310            .task_manager
1311            .take_pending_loop_updates()
1312            .await
1313            .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1314        let mut saw_items = false;
1315        while let Some(resolution) = resolutions.pop_front() {
1316            match resolution {
1317                TaskResolution::Item(item) => {
1318                    self.transcript.push(item);
1319                    saw_items = true;
1320                }
1321                TaskResolution::Approval(task) => {
1322                    self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1323                }
1324                TaskResolution::Auth(task) => {
1325                    return Ok((
1326                        saw_items,
1327                        Some(self.queue_auth_interrupt(&task.tool_request.turn_id.clone(), task)),
1328                    ));
1329                }
1330            }
1331        }
1332        Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1333    }
1334
1335    async fn maybe_compact(
1336        &mut self,
1337        turn_id: Option<&agentkit_core::TurnId>,
1338        cancellation: Option<TurnCancellation>,
1339    ) -> Result<(), LoopError> {
1340        let Some(compaction) = self.compaction.as_ref().cloned() else {
1341            return Ok(());
1342        };
1343        if cancellation
1344            .as_ref()
1345            .is_some_and(TurnCancellation::is_cancelled)
1346        {
1347            return Err(LoopError::Cancelled);
1348        }
1349        let Some(reason) =
1350            compaction
1351                .trigger
1352                .should_compact(&self.session_id, turn_id, &self.transcript)
1353        else {
1354            return Ok(());
1355        };
1356
1357        self.emit(AgentEvent::CompactionStarted {
1358            session_id: self.session_id.clone(),
1359            turn_id: turn_id.cloned(),
1360            reason: reason.clone(),
1361        });
1362
1363        let CompactionResult {
1364            transcript,
1365            replaced_items,
1366            metadata,
1367        } = compaction
1368            .strategy
1369            .apply(
1370                agentkit_compaction::CompactionRequest {
1371                    session_id: self.session_id.clone(),
1372                    turn_id: turn_id.cloned(),
1373                    transcript: self.transcript.clone(),
1374                    reason,
1375                    metadata: compaction.metadata.clone(),
1376                },
1377                &mut CompactionContext {
1378                    backend: compaction.backend.as_deref(),
1379                    metadata: &compaction.metadata,
1380                    cancellation,
1381                },
1382            )
1383            .await
1384            .map_err(|error| match error {
1385                agentkit_compaction::CompactionError::Cancelled => LoopError::Cancelled,
1386                other => LoopError::Compaction(other.to_string()),
1387            })?;
1388
1389        self.transcript = transcript;
1390        self.emit(AgentEvent::CompactionFinished {
1391            session_id: self.session_id.clone(),
1392            turn_id: turn_id.cloned(),
1393            replaced_items,
1394            transcript_len: self.transcript.len(),
1395            metadata,
1396        });
1397        Ok(())
1398    }
1399
1400    async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1401        let Some(_) = self.active_tool_round.as_ref() else {
1402            return Ok(None);
1403        };
1404        loop {
1405            let cancellation = self
1406                .cancellation
1407                .as_ref()
1408                .map(CancellationHandle::checkpoint);
1409            let turn_id = self
1410                .active_tool_round
1411                .as_ref()
1412                .map(|active| active.turn_id.clone())
1413                .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1414
1415            if cancellation
1416                .as_ref()
1417                .is_some_and(TurnCancellation::is_cancelled)
1418            {
1419                self.task_manager
1420                    .on_turn_interrupted(&turn_id)
1421                    .await
1422                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1423                self.active_tool_round = None;
1424                return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1425            }
1426
1427            let next_call = self
1428                .active_tool_round
1429                .as_mut()
1430                .and_then(|active| active.pending_calls.pop_front());
1431            if let Some((_call, tool_request)) = next_call {
1432                match self
1433                    .start_task_via_manager(None, tool_request.clone(), None, cancellation.clone())
1434                    .await?
1435                {
1436                    TaskStartOutcome::Ready(resolution) => {
1437                        let resolution = *resolution;
1438                        match resolution {
1439                            TaskResolution::Item(item) => {
1440                                if let Some(active) = self.active_tool_round.as_mut() {
1441                                    active.foreground_progressed = true;
1442                                }
1443                                self.transcript.push(item);
1444                            }
1445                            TaskResolution::Approval(task) => {
1446                                self.enqueue_pending_approval(&turn_id, task);
1447                            }
1448                            TaskResolution::Auth(task) => {
1449                                return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1450                            }
1451                        }
1452                        continue;
1453                    }
1454                    TaskStartOutcome::Pending { kind, .. } => {
1455                        if kind == agentkit_task_manager::TaskKind::Background
1456                            && let Some(active) = self.active_tool_round.as_mut()
1457                        {
1458                            active.background_pending = true;
1459                        }
1460                        continue;
1461                    }
1462                }
1463            }
1464
1465            match self
1466                .task_manager
1467                .wait_for_turn(&turn_id, cancellation.clone())
1468                .await
1469                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1470            {
1471                Some(TurnTaskUpdate::Resolution(resolution)) => {
1472                    let resolution = *resolution;
1473                    match resolution {
1474                        TaskResolution::Item(item) => {
1475                            if let Some(active) = self.active_tool_round.as_mut() {
1476                                active.foreground_progressed = true;
1477                            }
1478                            self.transcript.push(item);
1479                        }
1480                        TaskResolution::Approval(task) => {
1481                            self.enqueue_pending_approval(&turn_id, task);
1482                        }
1483                        TaskResolution::Auth(task) => {
1484                            return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1485                        }
1486                    }
1487                }
1488                Some(TurnTaskUpdate::Detached(snapshot)) => {
1489                    // The task was promoted to background. Push a synthetic
1490                    // tool result so the model knows the call is still
1491                    // running and can continue its turn.
1492                    self.transcript.push(Item {
1493                        id: None,
1494                        kind: ItemKind::Tool,
1495                        parts: vec![Part::ToolResult(ToolResultPart {
1496                            call_id: snapshot.call_id,
1497                            output: ToolOutput::Text(format!(
1498                                "Tool {} is now running in the background. \
1499                                 The result will be delivered when it completes.",
1500                                snapshot.tool_name,
1501                            )),
1502                            is_error: false,
1503                            metadata: MetadataMap::new(),
1504                        })],
1505                        metadata: MetadataMap::new(),
1506                    });
1507                    if let Some(active) = self.active_tool_round.as_mut() {
1508                        active.background_pending = true;
1509                        active.foreground_progressed = true;
1510                    }
1511                }
1512                None => {
1513                    if cancellation
1514                        .as_ref()
1515                        .is_some_and(TurnCancellation::is_cancelled)
1516                    {
1517                        self.task_manager
1518                            .on_turn_interrupted(&turn_id)
1519                            .await
1520                            .map_err(|error| {
1521                                LoopError::Tool(ToolError::Internal(error.to_string()))
1522                            })?;
1523                        self.active_tool_round = None;
1524                        return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1525                    }
1526                    let active = self.active_tool_round.take().ok_or_else(|| {
1527                        LoopError::InvalidState("missing active tool round".into())
1528                    })?;
1529                    if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1530                        return Ok(Some(step));
1531                    }
1532                    if let Some(step) = self.pending_auth_interrupt() {
1533                        return Ok(Some(step));
1534                    }
1535                    if let Some(step) = self.next_unresolved_approval_interrupt() {
1536                        return Ok(Some(step));
1537                    }
1538                    if active.background_pending && !active.foreground_progressed {
1539                        return Ok(None);
1540                    }
1541                    return Ok(Some(Box::pin(self.drive_turn(turn_id, false)).await?));
1542                }
1543            }
1544        }
1545    }
1546
1547    async fn drive_turn(
1548        &mut self,
1549        turn_id: agentkit_core::TurnId,
1550        emit_started: bool,
1551    ) -> Result<LoopStep, LoopError> {
1552        let cancellation = self
1553            .cancellation
1554            .as_ref()
1555            .map(CancellationHandle::checkpoint);
1556        match self
1557            .maybe_compact(Some(&turn_id), cancellation.clone())
1558            .await
1559        {
1560            Ok(()) => {}
1561            Err(LoopError::Cancelled) => {
1562                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1563            }
1564            Err(error) => return Err(error),
1565        }
1566        if emit_started {
1567            self.emit(AgentEvent::TurnStarted {
1568                session_id: self.session_id.clone(),
1569                turn_id: turn_id.clone(),
1570            });
1571        }
1572        if cancellation
1573            .as_ref()
1574            .is_some_and(TurnCancellation::is_cancelled)
1575        {
1576            return self.finish_cancelled(turn_id, interrupted_assistant_items());
1577        }
1578
1579        let request = TurnRequest {
1580            session_id: self.session_id.clone(),
1581            turn_id: turn_id.clone(),
1582            transcript: self.transcript.clone(),
1583            available_tools: self.tool_executor.specs(),
1584            cache: self
1585                .next_turn_cache
1586                .take()
1587                .or_else(|| self.default_cache.clone()),
1588            metadata: MetadataMap::new(),
1589        };
1590
1591        let session = self
1592            .session
1593            .as_mut()
1594            .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1595        let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1596            Ok(turn) => turn,
1597            Err(LoopError::Cancelled) => {
1598                self.task_manager
1599                    .on_turn_interrupted(&turn_id)
1600                    .await
1601                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1602                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1603            }
1604            Err(error) => return Err(error),
1605        };
1606        let mut saw_tool_call = false;
1607        let mut finished_result = None;
1608
1609        while let Some(event) = match turn.next_event(cancellation.clone()).await {
1610            Ok(event) => event,
1611            Err(LoopError::Cancelled) => {
1612                self.task_manager
1613                    .on_turn_interrupted(&turn_id)
1614                    .await
1615                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1616                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1617            }
1618            Err(error) => return Err(error),
1619        } {
1620            if cancellation
1621                .as_ref()
1622                .is_some_and(TurnCancellation::is_cancelled)
1623            {
1624                self.task_manager
1625                    .on_turn_interrupted(&turn_id)
1626                    .await
1627                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1628                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1629            }
1630            match event {
1631                ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1632                ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1633                ModelTurnEvent::ToolCall(call) => {
1634                    saw_tool_call = true;
1635                    self.emit(AgentEvent::ToolCallRequested(call.clone()));
1636                }
1637                ModelTurnEvent::Finished(result) => {
1638                    finished_result = Some(result);
1639                    break;
1640                }
1641            }
1642        }
1643
1644        let result = finished_result.ok_or_else(|| {
1645            LoopError::Provider("model turn ended without a Finished event".into())
1646        })?;
1647        self.transcript.extend(result.output_items.clone());
1648
1649        if saw_tool_call {
1650            let pending_calls = extract_tool_calls(&result.output_items)
1651                .into_iter()
1652                .map(|call| {
1653                    let tool_request = ToolRequest {
1654                        call_id: call.id.clone(),
1655                        tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1656                        input: call.input.clone(),
1657                        session_id: self.session_id.clone(),
1658                        turn_id: turn_id.clone(),
1659                        metadata: call.metadata.clone(),
1660                    };
1661                    (call, tool_request)
1662                })
1663                .collect();
1664            self.active_tool_round = Some(ActiveToolRound {
1665                turn_id: turn_id.clone(),
1666                pending_calls,
1667                background_pending: false,
1668                foreground_progressed: false,
1669            });
1670            if let Some(step) = self.continue_active_tool_round().await? {
1671                return Ok(step);
1672            }
1673            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1674                InputRequest {
1675                    session_id: self.session_id.clone(),
1676                    reason: "driver is waiting for input".into(),
1677                },
1678            )));
1679        }
1680
1681        let turn_result = TurnResult {
1682            turn_id,
1683            finish_reason: result.finish_reason,
1684            items: result.output_items,
1685            usage: result.usage,
1686            metadata: result.metadata,
1687        };
1688        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1689        Ok(LoopStep::Finished(turn_result))
1690    }
1691
1692    async fn resume_after_auth(
1693        &mut self,
1694        pending: PendingAuthToolCall,
1695    ) -> Result<LoopStep, LoopError> {
1696        let resolution = pending
1697            .resolution
1698            .clone()
1699            .ok_or_else(|| LoopError::InvalidState("pending auth has no resolution".into()))?;
1700        match resolution {
1701            AuthResolution::Provided { .. } => match self
1702                .start_task_via_manager(
1703                    Some(pending.task_id.clone()),
1704                    pending.tool_request.clone(),
1705                    None,
1706                    self.cancellation
1707                        .as_ref()
1708                        .map(CancellationHandle::checkpoint),
1709                )
1710                .await?
1711            {
1712                TaskStartOutcome::Ready(resolution) => {
1713                    let resolution = *resolution;
1714                    if let Some(step) =
1715                        self.queue_resolution_interrupt(&pending.turn_id, resolution)
1716                    {
1717                        return Ok(step);
1718                    }
1719                }
1720                TaskStartOutcome::Pending { .. } => {}
1721            },
1722            AuthResolution::Cancelled { .. } => {
1723                self.transcript.push(Item {
1724                    id: None,
1725                    kind: ItemKind::Tool,
1726                    parts: vec![Part::ToolResult(ToolResultPart {
1727                        call_id: pending.call.id.clone(),
1728                        output: ToolOutput::Text("auth cancelled".into()),
1729                        is_error: true,
1730                        metadata: pending.call.metadata.clone(),
1731                    })],
1732                    metadata: MetadataMap::new(),
1733                });
1734            }
1735        }
1736
1737        if let Some(step) = self.continue_active_tool_round().await? {
1738            Ok(step)
1739        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1740            Ok(step)
1741        } else if let Some(step) = self.pending_auth_interrupt() {
1742            Ok(step)
1743        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1744            Ok(step)
1745        } else {
1746            self.drive_turn(pending.turn_id, false).await
1747        }
1748    }
1749
1750    async fn resume_after_approval(
1751        &mut self,
1752        pending: PendingApprovalToolCall,
1753    ) -> Result<LoopStep, LoopError> {
1754        let decision = pending
1755            .decision
1756            .clone()
1757            .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
1758
1759        match decision {
1760            ApprovalDecision::Approve => match self
1761                .start_task_via_manager(
1762                    Some(pending.task_id.clone()),
1763                    pending.tool_request.clone(),
1764                    Some(pending.request.clone()),
1765                    self.cancellation
1766                        .as_ref()
1767                        .map(CancellationHandle::checkpoint),
1768                )
1769                .await?
1770            {
1771                TaskStartOutcome::Ready(resolution) => {
1772                    let resolution = *resolution;
1773                    if let Some(step) =
1774                        self.queue_resolution_interrupt(&pending.turn_id, resolution)
1775                    {
1776                        return Ok(step);
1777                    }
1778                }
1779                TaskStartOutcome::Pending { .. } => {}
1780            },
1781            ApprovalDecision::Deny { reason } => {
1782                self.transcript.push(Item {
1783                    id: None,
1784                    kind: ItemKind::Tool,
1785                    parts: vec![Part::ToolResult(ToolResultPart {
1786                        call_id: pending.call.id.clone(),
1787                        output: ToolOutput::Text(
1788                            reason.unwrap_or_else(|| "approval denied".into()),
1789                        ),
1790                        is_error: true,
1791                        metadata: pending.call.metadata.clone(),
1792                    })],
1793                    metadata: MetadataMap::new(),
1794                });
1795            }
1796        }
1797
1798        if let Some(step) = self.continue_active_tool_round().await? {
1799            Ok(step)
1800        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1801            Ok(step)
1802        } else if let Some(step) = self.pending_auth_interrupt() {
1803            Ok(step)
1804        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1805            Ok(step)
1806        } else {
1807            self.drive_turn(pending.turn_id, false).await
1808        }
1809    }
1810
1811    fn finish_cancelled(
1812        &mut self,
1813        turn_id: agentkit_core::TurnId,
1814        items: Vec<Item>,
1815    ) -> Result<LoopStep, LoopError> {
1816        self.transcript.extend(items.clone());
1817        let turn_result = TurnResult {
1818            turn_id,
1819            finish_reason: FinishReason::Cancelled,
1820            items,
1821            usage: None,
1822            metadata: interrupted_metadata("turn"),
1823        };
1824        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1825        Ok(LoopStep::Finished(turn_result))
1826    }
1827
1828    /// Enqueue user input items for the next turn.
1829    ///
1830    /// Items are buffered and consumed the next time [`next`](LoopDriver::next)
1831    /// is called.  Must not be called while an interrupt is pending.
1832    ///
1833    /// # Errors
1834    ///
1835    /// Returns [`LoopError::InvalidState`] if an interrupt is still unresolved.
1836    pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
1837        if self.has_pending_interrupts() {
1838            return Err(LoopError::InvalidState(
1839                "cannot submit input while an interrupt is pending".into(),
1840            ));
1841        }
1842        self.emit(AgentEvent::InputAccepted {
1843            session_id: self.session_id.clone(),
1844            items: input.clone(),
1845        });
1846        self.pending_input.extend(input);
1847        Ok(())
1848    }
1849
1850    /// Override the prompt cache request for the next model turn.
1851    ///
1852    /// The override is consumed the next time the driver starts a model turn.
1853    /// Session-level defaults still apply to later turns.
1854    pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
1855        if self.has_pending_interrupts() {
1856            return Err(LoopError::InvalidState(
1857                "cannot update next-turn cache while an interrupt is pending".into(),
1858            ));
1859        }
1860        self.next_turn_cache = Some(cache);
1861        Ok(())
1862    }
1863
1864    /// Enqueue user input and set a prompt cache override for the next model
1865    /// turn in one call.
1866    pub fn submit_input_with_cache(
1867        &mut self,
1868        input: Vec<Item>,
1869        cache: PromptCacheRequest,
1870    ) -> Result<(), LoopError> {
1871        self.set_next_turn_cache(cache)?;
1872        self.submit_input(input)
1873    }
1874
1875    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`].
1876    ///
1877    /// After calling this, invoke [`next`](LoopDriver::next) to continue the
1878    /// loop.  If the decision is [`ApprovalDecision::Approve`] the tool call
1879    /// executes; if denied, an error result is fed back to the model.
1880    ///
1881    /// # Errors
1882    ///
1883    /// Returns [`LoopError::InvalidState`] if no approval is pending.
1884    pub fn resolve_approval_for(
1885        &mut self,
1886        call_id: ToolCallId,
1887        decision: ApprovalDecision,
1888    ) -> Result<(), LoopError> {
1889        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1890            return Err(LoopError::InvalidState(format!(
1891                "no approval request is pending for call {}",
1892                call_id.0
1893            )));
1894        };
1895        pending.decision = Some(decision.clone());
1896        self.emit(AgentEvent::ApprovalResolved {
1897            approved: matches!(decision, ApprovalDecision::Approve),
1898        });
1899        Ok(())
1900    }
1901
1902    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] when exactly one
1903    /// approval is outstanding.
1904    pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
1905        let mut unresolved = self
1906            .pending_approval_order
1907            .iter()
1908            .filter(|call_id| {
1909                self.pending_approvals
1910                    .get(*call_id)
1911                    .is_some_and(|pending| pending.decision.is_none())
1912            })
1913            .cloned();
1914        let Some(call_id) = unresolved.next() else {
1915            return Err(LoopError::InvalidState(
1916                "no approval request is pending".into(),
1917            ));
1918        };
1919        if unresolved.next().is_some() {
1920            return Err(LoopError::InvalidState(
1921                "multiple approvals are pending; use resolve_approval_for".into(),
1922            ));
1923        }
1924        self.resolve_approval_for(call_id, decision)
1925    }
1926
1927    /// Resolve a pending [`LoopInterrupt::AuthRequest`].
1928    ///
1929    /// The resolution must reference the same request id as the pending
1930    /// [`AuthRequest`].  After calling this, invoke [`next`](LoopDriver::next)
1931    /// to continue the loop.
1932    ///
1933    /// # Errors
1934    ///
1935    /// Returns [`LoopError::InvalidState`] if no auth request is pending or
1936    /// if the resolution's request id does not match.
1937    pub fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), LoopError> {
1938        let Some(pending) = self.pending_auth.as_mut() else {
1939            return Err(LoopError::InvalidState("no auth request is pending".into()));
1940        };
1941        if pending.request.id != resolution.request().id {
1942            return Err(LoopError::InvalidState(
1943                "auth resolution does not match the pending request".into(),
1944            ));
1945        }
1946        pending.resolution = Some(resolution.clone());
1947        self.emit(AgentEvent::AuthResolved {
1948            provided: matches!(resolution, AuthResolution::Provided { .. }),
1949        });
1950        Ok(())
1951    }
1952
1953    /// Take a read-only snapshot of the driver's current transcript and input queue.
1954    pub fn snapshot(&self) -> LoopSnapshot {
1955        LoopSnapshot {
1956            session_id: self.session_id.clone(),
1957            transcript: self.transcript.clone(),
1958            pending_input: self.pending_input.clone(),
1959        }
1960    }
1961
1962    /// Advance the loop by one step.
1963    ///
1964    /// This is the main method for driving the agent.  It processes pending
1965    /// interrupt resolutions, consumes queued input, starts a model turn,
1966    /// executes tool calls, and returns once the turn finishes or an
1967    /// interrupt occurs.
1968    ///
1969    /// If no input is queued and no interrupt is pending, returns
1970    /// [`LoopStep::Interrupt(LoopInterrupt::AwaitingInput(..))`](LoopInterrupt::AwaitingInput).
1971    ///
1972    /// # Errors
1973    ///
1974    /// Returns [`LoopError::InvalidState`] if called while an unresolved
1975    /// interrupt is pending, or propagates provider / tool / compaction errors.
1976    pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
1977        if self
1978            .pending_auth
1979            .as_ref()
1980            .is_some_and(|pending| pending.resolution.is_some())
1981        {
1982            let pending = self
1983                .pending_auth
1984                .take()
1985                .ok_or_else(|| LoopError::InvalidState("missing pending auth state".into()))?;
1986            return self.resume_after_auth(pending).await;
1987        }
1988
1989        if let Some(pending) = self.take_next_resolved_approval() {
1990            return self.resume_after_approval(pending).await;
1991        }
1992
1993        if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1994            return Ok(step);
1995        }
1996
1997        if let Some(step) = self.pending_auth_interrupt() {
1998            return Ok(step);
1999        }
2000
2001        if let Some(step) = self.next_unresolved_approval_interrupt() {
2002            return Ok(step);
2003        }
2004
2005        if let Some(step) = self.continue_active_tool_round().await? {
2006            return Ok(step);
2007        }
2008
2009        let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2010        if let Some(step) = loop_step {
2011            return Ok(step);
2012        }
2013
2014        if self.pending_input.is_empty() && !had_loop_updates {
2015            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2016                InputRequest {
2017                    session_id: self.session_id.clone(),
2018                    reason: "driver is waiting for input".into(),
2019                },
2020            )));
2021        }
2022
2023        let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2024        self.next_turn_index += 1;
2025        self.transcript.append(&mut self.pending_input);
2026        self.drive_turn(turn_id, true).await
2027    }
2028
2029    fn emit(&mut self, event: AgentEvent) {
2030        for observer in &mut self.observers {
2031            observer.handle_event(event.clone());
2032        }
2033    }
2034}
2035
2036fn interrupted_metadata(stage: &str) -> MetadataMap {
2037    let mut metadata = MetadataMap::new();
2038    metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2039    metadata.insert(
2040        INTERRUPT_REASON_METADATA_KEY.into(),
2041        USER_CANCELLED_REASON.into(),
2042    );
2043    metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2044    metadata
2045}
2046
2047fn interrupted_assistant_items() -> Vec<Item> {
2048    vec![Item {
2049        id: None,
2050        kind: ItemKind::Assistant,
2051        parts: vec![Part::Text(TextPart {
2052            text: "Previous assistant response was interrupted by the user before completion."
2053                .into(),
2054            metadata: interrupted_metadata("assistant"),
2055        })],
2056        metadata: interrupted_metadata("assistant"),
2057    }]
2058}
2059
2060fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2061    let mut calls = Vec::new();
2062    for item in items {
2063        for part in &item.parts {
2064            if let Part::ToolCall(call) = part {
2065                calls.push(call.clone());
2066            }
2067        }
2068    }
2069    calls
2070}
2071
2072fn upgrade_auth_request(
2073    mut request: AuthRequest,
2074    tool_request: &ToolRequest,
2075    _call: &ToolCallPart,
2076) -> AuthRequest {
2077    if matches!(request.operation, AuthOperation::ToolCall { .. }) {
2078        return request;
2079    }
2080
2081    let prior_server_id = request.challenge.get("server_id").cloned();
2082    let mut metadata = tool_request.metadata.clone();
2083    if let Some(server_id) = prior_server_id {
2084        metadata.entry("server_id".into()).or_insert(server_id);
2085    }
2086    request.operation = AuthOperation::ToolCall {
2087        tool_name: tool_request.tool_name.0.clone(),
2088        input: tool_request.input.clone(),
2089        call_id: Some(tool_request.call_id.clone()),
2090        session_id: Some(tool_request.session_id.clone()),
2091        turn_id: Some(tool_request.turn_id.clone()),
2092        metadata,
2093    };
2094    request
2095}
2096
2097struct AllowAllPermissions;
2098
2099impl PermissionChecker for AllowAllPermissions {
2100    fn evaluate(
2101        &self,
2102        _request: &dyn agentkit_tools_core::PermissionRequest,
2103    ) -> agentkit_tools_core::PermissionDecision {
2104        agentkit_tools_core::PermissionDecision::Allow
2105    }
2106}
2107
2108/// Errors that can occur while driving the agent loop.
2109#[derive(Debug, Error)]
2110pub enum LoopError {
2111    /// The driver was in an unexpected state for the requested operation.
2112    #[error("invalid driver state: {0}")]
2113    InvalidState(String),
2114    /// The current turn was cancelled via the [`CancellationHandle`].
2115    #[error("turn cancelled")]
2116    Cancelled,
2117    /// An error originating from the model provider.
2118    #[error("provider error: {0}")]
2119    Provider(String),
2120    /// An error originating from tool execution.
2121    #[error("tool error: {0}")]
2122    Tool(#[from] ToolError),
2123    /// An error that occurred during transcript compaction.
2124    #[error("compaction error: {0}")]
2125    Compaction(String),
2126    /// The requested operation is not supported.
2127    #[error("unsupported operation: {0}")]
2128    Unsupported(String),
2129}
2130
2131#[cfg(test)]
2132mod tests {
2133    use std::collections::VecDeque;
2134    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2135    use std::sync::{Arc as StdArc, Mutex as StdMutex};
2136
2137    use agentkit_compaction::{CompactionPipeline, CompactionTrigger, KeepRecentStrategy};
2138    use agentkit_core::{
2139        CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolOutput, ToolResultPart,
2140    };
2141    use agentkit_task_manager::{
2142        AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2143        TaskRoutingPolicy,
2144    };
2145    use agentkit_tools_core::{
2146        FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2147        ToolAnnotations, ToolName, ToolResult, ToolSpec,
2148    };
2149    use serde_json::{Value, json};
2150    use tokio::sync::Notify;
2151    use tokio::time::{Duration, timeout};
2152
2153    use super::*;
2154
2155    struct FakeAdapter;
2156    struct SlowAdapter;
2157    struct RecordingAdapter {
2158        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2159        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2160    }
2161    struct MultiToolAdapter;
2162    struct DualApprovalAdapter;
2163
2164    struct FakeSession;
2165    struct SlowSession;
2166    struct RecordingSession {
2167        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2168        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2169    }
2170    struct MultiToolSession;
2171    struct DualApprovalSession;
2172
2173    struct FakeTurn {
2174        events: VecDeque<ModelTurnEvent>,
2175    }
2176
2177    struct SlowTurn {
2178        emitted: bool,
2179    }
2180
2181    struct RecordingTurn {
2182        emitted: bool,
2183    }
2184    struct MultiToolTurn {
2185        events: VecDeque<ModelTurnEvent>,
2186    }
2187    struct DualApprovalTurn {
2188        events: VecDeque<ModelTurnEvent>,
2189    }
2190
2191    #[async_trait]
2192    impl ModelAdapter for FakeAdapter {
2193        type Session = FakeSession;
2194
2195        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2196            Ok(FakeSession)
2197        }
2198    }
2199
2200    #[async_trait]
2201    impl ModelAdapter for SlowAdapter {
2202        type Session = SlowSession;
2203
2204        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2205            Ok(SlowSession)
2206        }
2207    }
2208
2209    #[async_trait]
2210    impl ModelAdapter for RecordingAdapter {
2211        type Session = RecordingSession;
2212
2213        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2214            Ok(RecordingSession {
2215                seen_descriptions: self.seen_descriptions.clone(),
2216                seen_caches: self.seen_caches.clone(),
2217            })
2218        }
2219    }
2220
2221    #[async_trait]
2222    impl ModelAdapter for MultiToolAdapter {
2223        type Session = MultiToolSession;
2224
2225        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2226            Ok(MultiToolSession)
2227        }
2228    }
2229
2230    #[async_trait]
2231    impl ModelAdapter for DualApprovalAdapter {
2232        type Session = DualApprovalSession;
2233
2234        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2235            Ok(DualApprovalSession)
2236        }
2237    }
2238
2239    #[async_trait]
2240    impl ModelSession for FakeSession {
2241        type Turn = FakeTurn;
2242
2243        async fn begin_turn(
2244            &mut self,
2245            request: TurnRequest,
2246            _cancellation: Option<TurnCancellation>,
2247        ) -> Result<Self::Turn, LoopError> {
2248            let has_tool_result = request.transcript.iter().any(|item| {
2249                item.kind == ItemKind::Tool
2250                    && item
2251                        .parts
2252                        .iter()
2253                        .any(|part| matches!(part, Part::ToolResult(_)))
2254            });
2255            let tool_name = request
2256                .available_tools
2257                .first()
2258                .map(|tool| tool.name.0.clone())
2259                .unwrap_or_else(|| "echo".into());
2260
2261            let events = if has_tool_result {
2262                let result_text = request
2263                    .transcript
2264                    .iter()
2265                    .rev()
2266                    .find_map(|item| {
2267                        item.parts.iter().find_map(|part| match part {
2268                            Part::ToolResult(ToolResultPart {
2269                                output: ToolOutput::Text(text),
2270                                ..
2271                            }) => Some(text.clone()),
2272                            _ => None,
2273                        })
2274                    })
2275                    .unwrap_or_else(|| "missing".into());
2276
2277                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2278                    finish_reason: FinishReason::Completed,
2279                    output_items: vec![Item {
2280                        id: None,
2281                        kind: ItemKind::Assistant,
2282                        parts: vec![Part::Text(TextPart {
2283                            text: format!("tool said: {result_text}"),
2284                            metadata: MetadataMap::new(),
2285                        })],
2286                        metadata: MetadataMap::new(),
2287                    }],
2288                    usage: None,
2289                    metadata: MetadataMap::new(),
2290                })])
2291            } else {
2292                VecDeque::from([
2293                    ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2294                        id: ToolCallId::new("call-1"),
2295                        name: tool_name.clone(),
2296                        input: json!({ "value": "pong" }),
2297                        metadata: MetadataMap::new(),
2298                    }),
2299                    ModelTurnEvent::Finished(ModelTurnResult {
2300                        finish_reason: FinishReason::ToolCall,
2301                        output_items: vec![Item {
2302                            id: None,
2303                            kind: ItemKind::Assistant,
2304                            parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2305                                id: ToolCallId::new("call-1"),
2306                                name: tool_name,
2307                                input: json!({ "value": "pong" }),
2308                                metadata: MetadataMap::new(),
2309                            })],
2310                            metadata: MetadataMap::new(),
2311                        }],
2312                        usage: None,
2313                        metadata: MetadataMap::new(),
2314                    }),
2315                ])
2316            };
2317
2318            Ok(FakeTurn { events })
2319        }
2320    }
2321
2322    #[async_trait]
2323    impl ModelSession for SlowSession {
2324        type Turn = SlowTurn;
2325
2326        async fn begin_turn(
2327            &mut self,
2328            request: TurnRequest,
2329            cancellation: Option<TurnCancellation>,
2330        ) -> Result<Self::Turn, LoopError> {
2331            let should_block = request
2332                .transcript
2333                .iter()
2334                .rev()
2335                .find(|item| item.kind == ItemKind::User)
2336                .is_some_and(|item| {
2337                    item.parts.iter().any(|part| match part {
2338                        Part::Text(text) => text.text == "do the long task",
2339                        _ => false,
2340                    })
2341                });
2342
2343            if should_block && let Some(cancellation) = cancellation {
2344                cancellation.cancelled().await;
2345                return Err(LoopError::Cancelled);
2346            }
2347
2348            Ok(SlowTurn { emitted: false })
2349        }
2350    }
2351
2352    #[async_trait]
2353    impl ModelSession for RecordingSession {
2354        type Turn = RecordingTurn;
2355
2356        async fn begin_turn(
2357            &mut self,
2358            request: TurnRequest,
2359            _cancellation: Option<TurnCancellation>,
2360        ) -> Result<Self::Turn, LoopError> {
2361            let descriptions = request
2362                .available_tools
2363                .iter()
2364                .map(|tool| tool.description.clone())
2365                .collect::<Vec<_>>();
2366            self.seen_descriptions.lock().unwrap().push(descriptions);
2367            self.seen_caches.lock().unwrap().push(request.cache.clone());
2368
2369            Ok(RecordingTurn { emitted: false })
2370        }
2371    }
2372
2373    #[async_trait]
2374    impl ModelSession for MultiToolSession {
2375        type Turn = MultiToolTurn;
2376
2377        async fn begin_turn(
2378            &mut self,
2379            request: TurnRequest,
2380            _cancellation: Option<TurnCancellation>,
2381        ) -> Result<Self::Turn, LoopError> {
2382            let has_tool_result = request.transcript.iter().any(|item| {
2383                item.kind == ItemKind::Tool
2384                    && item
2385                        .parts
2386                        .iter()
2387                        .any(|part| matches!(part, Part::ToolResult(_)))
2388            });
2389
2390            let events = if has_tool_result {
2391                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2392                    finish_reason: FinishReason::Completed,
2393                    output_items: vec![Item {
2394                        id: None,
2395                        kind: ItemKind::Assistant,
2396                        parts: vec![Part::Text(TextPart {
2397                            text: "mixed tools finished".into(),
2398                            metadata: MetadataMap::new(),
2399                        })],
2400                        metadata: MetadataMap::new(),
2401                    }],
2402                    usage: None,
2403                    metadata: MetadataMap::new(),
2404                })])
2405            } else {
2406                let foreground = agentkit_core::ToolCallPart {
2407                    id: ToolCallId::new("call-foreground"),
2408                    name: "foreground-wait".into(),
2409                    input: json!({}),
2410                    metadata: MetadataMap::new(),
2411                };
2412                let background = agentkit_core::ToolCallPart {
2413                    id: ToolCallId::new("call-background"),
2414                    name: "background-wait".into(),
2415                    input: json!({}),
2416                    metadata: MetadataMap::new(),
2417                };
2418                VecDeque::from([
2419                    ModelTurnEvent::ToolCall(foreground.clone()),
2420                    ModelTurnEvent::ToolCall(background.clone()),
2421                    ModelTurnEvent::Finished(ModelTurnResult {
2422                        finish_reason: FinishReason::ToolCall,
2423                        output_items: vec![Item {
2424                            id: None,
2425                            kind: ItemKind::Assistant,
2426                            parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2427                            metadata: MetadataMap::new(),
2428                        }],
2429                        usage: None,
2430                        metadata: MetadataMap::new(),
2431                    }),
2432                ])
2433            };
2434
2435            Ok(MultiToolTurn { events })
2436        }
2437    }
2438
2439    #[async_trait]
2440    impl ModelSession for DualApprovalSession {
2441        type Turn = DualApprovalTurn;
2442
2443        async fn begin_turn(
2444            &mut self,
2445            request: TurnRequest,
2446            _cancellation: Option<TurnCancellation>,
2447        ) -> Result<Self::Turn, LoopError> {
2448            let tool_results = request
2449                .transcript
2450                .iter()
2451                .flat_map(|item| item.parts.iter())
2452                .filter(|part| matches!(part, Part::ToolResult(_)))
2453                .count();
2454
2455            let events = if tool_results >= 2 {
2456                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2457                    finish_reason: FinishReason::Completed,
2458                    output_items: vec![Item {
2459                        id: None,
2460                        kind: ItemKind::Assistant,
2461                        parts: vec![Part::Text(TextPart {
2462                            text: "both approvals finished".into(),
2463                            metadata: MetadataMap::new(),
2464                        })],
2465                        metadata: MetadataMap::new(),
2466                    }],
2467                    usage: None,
2468                    metadata: MetadataMap::new(),
2469                })])
2470            } else {
2471                let first = agentkit_core::ToolCallPart {
2472                    id: ToolCallId::new("call-1"),
2473                    name: "echo".into(),
2474                    input: json!({ "value": "first" }),
2475                    metadata: MetadataMap::new(),
2476                };
2477                let second = agentkit_core::ToolCallPart {
2478                    id: ToolCallId::new("call-2"),
2479                    name: "echo".into(),
2480                    input: json!({ "value": "second" }),
2481                    metadata: MetadataMap::new(),
2482                };
2483                VecDeque::from([
2484                    ModelTurnEvent::ToolCall(first.clone()),
2485                    ModelTurnEvent::ToolCall(second.clone()),
2486                    ModelTurnEvent::Finished(ModelTurnResult {
2487                        finish_reason: FinishReason::ToolCall,
2488                        output_items: vec![Item {
2489                            id: None,
2490                            kind: ItemKind::Assistant,
2491                            parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2492                            metadata: MetadataMap::new(),
2493                        }],
2494                        usage: None,
2495                        metadata: MetadataMap::new(),
2496                    }),
2497                ])
2498            };
2499
2500            Ok(DualApprovalTurn { events })
2501        }
2502    }
2503
2504    #[async_trait]
2505    impl ModelTurn for FakeTurn {
2506        async fn next_event(
2507            &mut self,
2508            _cancellation: Option<TurnCancellation>,
2509        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2510            Ok(self.events.pop_front())
2511        }
2512    }
2513
2514    #[async_trait]
2515    impl ModelTurn for SlowTurn {
2516        async fn next_event(
2517            &mut self,
2518            cancellation: Option<TurnCancellation>,
2519        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2520            if let Some(cancellation) = cancellation
2521                && cancellation.is_cancelled()
2522            {
2523                return Err(LoopError::Cancelled);
2524            }
2525
2526            if self.emitted {
2527                Ok(None)
2528            } else {
2529                self.emitted = true;
2530                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2531                    finish_reason: FinishReason::Completed,
2532                    output_items: vec![Item {
2533                        id: None,
2534                        kind: ItemKind::Assistant,
2535                        parts: vec![Part::Text(TextPart {
2536                            text: "done".into(),
2537                            metadata: MetadataMap::new(),
2538                        })],
2539                        metadata: MetadataMap::new(),
2540                    }],
2541                    usage: None,
2542                    metadata: MetadataMap::new(),
2543                })))
2544            }
2545        }
2546    }
2547
2548    #[async_trait]
2549    impl ModelTurn for RecordingTurn {
2550        async fn next_event(
2551            &mut self,
2552            _cancellation: Option<TurnCancellation>,
2553        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2554            if self.emitted {
2555                Ok(None)
2556            } else {
2557                self.emitted = true;
2558                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2559                    finish_reason: FinishReason::Completed,
2560                    output_items: vec![Item {
2561                        id: None,
2562                        kind: ItemKind::Assistant,
2563                        parts: vec![Part::Text(TextPart {
2564                            text: "done".into(),
2565                            metadata: MetadataMap::new(),
2566                        })],
2567                        metadata: MetadataMap::new(),
2568                    }],
2569                    usage: None,
2570                    metadata: MetadataMap::new(),
2571                })))
2572            }
2573        }
2574    }
2575
2576    #[async_trait]
2577    impl ModelTurn for MultiToolTurn {
2578        async fn next_event(
2579            &mut self,
2580            _cancellation: Option<TurnCancellation>,
2581        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2582            Ok(self.events.pop_front())
2583        }
2584    }
2585
2586    #[async_trait]
2587    impl ModelTurn for DualApprovalTurn {
2588        async fn next_event(
2589            &mut self,
2590            _cancellation: Option<TurnCancellation>,
2591        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2592            Ok(self.events.pop_front())
2593        }
2594    }
2595
2596    #[derive(Clone)]
2597    struct EchoTool {
2598        spec: ToolSpec,
2599    }
2600
2601    impl Default for EchoTool {
2602        fn default() -> Self {
2603            Self {
2604                spec: ToolSpec {
2605                    name: ToolName::new("echo"),
2606                    description: "Echo back a value".into(),
2607                    input_schema: json!({
2608                        "type": "object",
2609                        "properties": {
2610                            "value": { "type": "string" }
2611                        },
2612                        "required": ["value"],
2613                        "additionalProperties": false
2614                    }),
2615                    annotations: ToolAnnotations::default(),
2616                    metadata: MetadataMap::new(),
2617                },
2618            }
2619        }
2620    }
2621
2622    #[derive(Clone)]
2623    struct DynamicSpecTool {
2624        spec: ToolSpec,
2625        version: StdArc<AtomicUsize>,
2626    }
2627
2628    impl DynamicSpecTool {
2629        fn new(version: StdArc<AtomicUsize>) -> Self {
2630            Self {
2631                spec: ToolSpec {
2632                    name: ToolName::new("dynamic"),
2633                    description: "dynamic version 0".into(),
2634                    input_schema: json!({
2635                        "type": "object",
2636                        "properties": {},
2637                        "additionalProperties": false
2638                    }),
2639                    annotations: ToolAnnotations::default(),
2640                    metadata: MetadataMap::new(),
2641                },
2642                version,
2643            }
2644        }
2645    }
2646
2647    #[async_trait]
2648    impl Tool for EchoTool {
2649        fn spec(&self) -> &ToolSpec {
2650            &self.spec
2651        }
2652
2653        fn proposed_requests(
2654            &self,
2655            request: &agentkit_tools_core::ToolRequest,
2656        ) -> Result<
2657            Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
2658            agentkit_tools_core::ToolError,
2659        > {
2660            Ok(vec![Box::new(FileSystemPermissionRequest::Read {
2661                path: "/tmp/echo".into(),
2662                metadata: request.metadata.clone(),
2663            })])
2664        }
2665
2666        async fn invoke(
2667            &self,
2668            request: agentkit_tools_core::ToolRequest,
2669            _ctx: &mut ToolContext<'_>,
2670        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2671            let value = request
2672                .input
2673                .get("value")
2674                .and_then(Value::as_str)
2675                .ok_or_else(|| {
2676                    agentkit_tools_core::ToolError::InvalidInput("missing value".into())
2677                })?;
2678
2679            Ok(ToolResult {
2680                result: ToolResultPart {
2681                    call_id: request.call_id,
2682                    output: ToolOutput::Text(value.into()),
2683                    is_error: false,
2684                    metadata: MetadataMap::new(),
2685                },
2686                duration: None,
2687                metadata: MetadataMap::new(),
2688            })
2689        }
2690    }
2691
2692    #[async_trait]
2693    impl Tool for DynamicSpecTool {
2694        fn spec(&self) -> &ToolSpec {
2695            &self.spec
2696        }
2697
2698        fn current_spec(&self) -> Option<ToolSpec> {
2699            let mut spec = self.spec.clone();
2700            spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
2701            Some(spec)
2702        }
2703
2704        async fn invoke(
2705            &self,
2706            request: agentkit_tools_core::ToolRequest,
2707            _ctx: &mut ToolContext<'_>,
2708        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2709            Ok(ToolResult {
2710                result: ToolResultPart {
2711                    call_id: request.call_id,
2712                    output: ToolOutput::Text("ok".into()),
2713                    is_error: false,
2714                    metadata: MetadataMap::new(),
2715                },
2716                duration: None,
2717                metadata: MetadataMap::new(),
2718            })
2719        }
2720    }
2721
2722    struct DenyFsReads;
2723
2724    impl PermissionChecker for DenyFsReads {
2725        fn evaluate(
2726            &self,
2727            request: &dyn agentkit_tools_core::PermissionRequest,
2728        ) -> PermissionDecision {
2729            if request.kind() == "filesystem.read" {
2730                return PermissionDecision::Deny(PermissionDenial {
2731                    code: PermissionCode::PathNotAllowed,
2732                    message: "reads denied in test".into(),
2733                    metadata: MetadataMap::new(),
2734                });
2735            }
2736
2737            PermissionDecision::Allow
2738        }
2739    }
2740
2741    struct ApproveFsReads;
2742
2743    impl PermissionChecker for ApproveFsReads {
2744        fn evaluate(
2745            &self,
2746            request: &dyn agentkit_tools_core::PermissionRequest,
2747        ) -> PermissionDecision {
2748            if request.kind() == "filesystem.read" {
2749                return PermissionDecision::RequireApproval(ApprovalRequest {
2750                    task_id: None,
2751                    call_id: None,
2752                    id: "approval:fs-read".into(),
2753                    request_kind: request.kind().into(),
2754                    reason: agentkit_tools_core::ApprovalReason::SensitivePath,
2755                    summary: request.summary(),
2756                    metadata: request.metadata().clone(),
2757                });
2758            }
2759
2760            PermissionDecision::Allow
2761        }
2762    }
2763
2764    struct CountTrigger;
2765
2766    impl CompactionTrigger for CountTrigger {
2767        fn should_compact(
2768            &self,
2769            _session_id: &SessionId,
2770            _turn_id: Option<&agentkit_core::TurnId>,
2771            transcript: &[Item],
2772        ) -> Option<agentkit_compaction::CompactionReason> {
2773            (transcript.len() >= 2)
2774                .then_some(agentkit_compaction::CompactionReason::TranscriptTooLong)
2775        }
2776    }
2777
2778    struct RecordingObserver {
2779        events: StdArc<StdMutex<Vec<AgentEvent>>>,
2780    }
2781
2782    impl LoopObserver for RecordingObserver {
2783        fn handle_event(&mut self, event: AgentEvent) {
2784            self.events.lock().unwrap().push(event);
2785        }
2786    }
2787
2788    #[derive(Clone)]
2789    struct AuthTool {
2790        spec: ToolSpec,
2791    }
2792
2793    impl Default for AuthTool {
2794        fn default() -> Self {
2795            Self {
2796                spec: ToolSpec {
2797                    name: ToolName::new("auth-tool"),
2798                    description: "Always requires auth".into(),
2799                    input_schema: json!({
2800                        "type": "object",
2801                        "properties": {},
2802                        "additionalProperties": false
2803                    }),
2804                    annotations: ToolAnnotations::default(),
2805                    metadata: MetadataMap::new(),
2806                },
2807            }
2808        }
2809    }
2810
2811    #[async_trait]
2812    impl Tool for AuthTool {
2813        fn spec(&self) -> &ToolSpec {
2814            &self.spec
2815        }
2816
2817        async fn invoke(
2818            &self,
2819            request: agentkit_tools_core::ToolRequest,
2820            _ctx: &mut ToolContext<'_>,
2821        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2822            let mut challenge = MetadataMap::new();
2823            challenge.insert("server_id".into(), json!("mock"));
2824            challenge.insert("scope".into(), json!("secret.read"));
2825
2826            Err(agentkit_tools_core::ToolError::AuthRequired(Box::new(
2827                AuthRequest {
2828                    task_id: None,
2829                    id: "auth-1".into(),
2830                    provider: "mcp.mock".into(),
2831                    operation: AuthOperation::ToolCall {
2832                        tool_name: request.tool_name.0,
2833                        input: request.input,
2834                        call_id: Some(request.call_id),
2835                        session_id: Some(request.session_id),
2836                        turn_id: Some(request.turn_id),
2837                        metadata: request.metadata,
2838                    },
2839                    challenge,
2840                },
2841            )))
2842        }
2843    }
2844
2845    #[derive(Clone)]
2846    struct BlockingTool {
2847        spec: ToolSpec,
2848        entered: StdArc<AtomicBool>,
2849        release: StdArc<Notify>,
2850        output: &'static str,
2851    }
2852
2853    impl BlockingTool {
2854        fn new(
2855            name: &str,
2856            entered: StdArc<AtomicBool>,
2857            release: StdArc<Notify>,
2858            output: &'static str,
2859        ) -> Self {
2860            Self {
2861                spec: ToolSpec {
2862                    name: ToolName::new(name),
2863                    description: format!("blocking tool {name}"),
2864                    input_schema: json!({
2865                        "type": "object",
2866                        "properties": {},
2867                        "additionalProperties": false
2868                    }),
2869                    annotations: ToolAnnotations::default(),
2870                    metadata: MetadataMap::new(),
2871                },
2872                entered,
2873                release,
2874                output,
2875            }
2876        }
2877    }
2878
2879    #[async_trait]
2880    impl Tool for BlockingTool {
2881        fn spec(&self) -> &ToolSpec {
2882            &self.spec
2883        }
2884
2885        async fn invoke(
2886            &self,
2887            request: agentkit_tools_core::ToolRequest,
2888            _ctx: &mut ToolContext<'_>,
2889        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2890            self.entered.store(true, Ordering::SeqCst);
2891            self.release.notified().await;
2892            Ok(ToolResult {
2893                result: ToolResultPart {
2894                    call_id: request.call_id,
2895                    output: ToolOutput::Text(self.output.into()),
2896                    is_error: false,
2897                    metadata: MetadataMap::new(),
2898                },
2899                duration: None,
2900                metadata: MetadataMap::new(),
2901            })
2902        }
2903    }
2904
2905    struct NameRoutingPolicy {
2906        routes: Vec<(String, RoutingDecision)>,
2907    }
2908
2909    impl NameRoutingPolicy {
2910        fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
2911            Self {
2912                routes: routes
2913                    .into_iter()
2914                    .map(|(name, decision)| (name.into(), decision))
2915                    .collect(),
2916            }
2917        }
2918    }
2919
2920    impl TaskRoutingPolicy for NameRoutingPolicy {
2921        fn route(&self, request: &ToolRequest) -> RoutingDecision {
2922            self.routes
2923                .iter()
2924                .find(|(name, _)| name == &request.tool_name.0)
2925                .map(|(_, decision)| *decision)
2926                .unwrap_or(RoutingDecision::Foreground)
2927        }
2928    }
2929
2930    async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
2931        timeout(Duration::from_secs(1), handle.next_event())
2932            .await
2933            .expect("timed out waiting for task event")
2934            .expect("task event stream ended unexpectedly")
2935    }
2936
2937    async fn wait_until_entered(flag: &AtomicBool) {
2938        timeout(Duration::from_secs(1), async {
2939            while !flag.load(Ordering::SeqCst) {
2940                tokio::task::yield_now().await;
2941            }
2942        })
2943        .await
2944        .expect("task never entered execution");
2945    }
2946
2947    #[tokio::test]
2948    async fn loop_continues_after_completed_tool_call() {
2949        let tools = ToolRegistry::new().with(EchoTool::default());
2950        let agent = Agent::builder()
2951            .model(FakeAdapter)
2952            .tools(tools)
2953            .permissions(AllowAllPermissions)
2954            .build()
2955            .unwrap();
2956
2957        let mut driver = agent
2958            .start(SessionConfig {
2959                session_id: SessionId::new("session-1"),
2960                metadata: MetadataMap::new(),
2961                cache: None,
2962            })
2963            .await
2964            .unwrap();
2965
2966        driver
2967            .submit_input(vec![Item {
2968                id: None,
2969                kind: ItemKind::User,
2970                parts: vec![Part::Text(TextPart {
2971                    text: "ping".into(),
2972                    metadata: MetadataMap::new(),
2973                })],
2974                metadata: MetadataMap::new(),
2975            }])
2976            .unwrap();
2977
2978        let result = driver.next().await.unwrap();
2979
2980        match result {
2981            LoopStep::Finished(turn) => {
2982                assert_eq!(turn.finish_reason, FinishReason::Completed);
2983                assert_eq!(turn.items.len(), 1);
2984                match &turn.items[0].parts[0] {
2985                    Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
2986                    other => panic!("unexpected part: {other:?}"),
2987                }
2988            }
2989            other => panic!("unexpected loop step: {other:?}"),
2990        }
2991    }
2992
2993    #[tokio::test]
2994    async fn loop_uses_injected_permission_checker() {
2995        let tools = ToolRegistry::new().with(EchoTool::default());
2996        let agent = Agent::builder()
2997            .model(FakeAdapter)
2998            .tools(tools)
2999            .permissions(DenyFsReads)
3000            .build()
3001            .unwrap();
3002
3003        let mut driver = agent
3004            .start(SessionConfig {
3005                session_id: SessionId::new("session-2"),
3006                metadata: MetadataMap::new(),
3007                cache: None,
3008            })
3009            .await
3010            .unwrap();
3011
3012        driver
3013            .submit_input(vec![Item {
3014                id: None,
3015                kind: ItemKind::User,
3016                parts: vec![Part::Text(TextPart {
3017                    text: "ping".into(),
3018                    metadata: MetadataMap::new(),
3019                })],
3020                metadata: MetadataMap::new(),
3021            }])
3022            .unwrap();
3023
3024        let result = driver.next().await.unwrap();
3025
3026        match result {
3027            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3028                Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3029                other => panic!("unexpected part: {other:?}"),
3030            },
3031            other => panic!("unexpected loop step: {other:?}"),
3032        }
3033    }
3034
3035    #[tokio::test]
3036    async fn loop_surfaces_auth_interruptions_from_tools() {
3037        let tools = ToolRegistry::new().with(AuthTool::default());
3038        let agent = Agent::builder()
3039            .model(FakeAdapter)
3040            .tools(tools)
3041            .permissions(AllowAllPermissions)
3042            .build()
3043            .unwrap();
3044
3045        let mut driver = agent
3046            .start(SessionConfig {
3047                session_id: SessionId::new("session-3"),
3048                metadata: MetadataMap::new(),
3049                cache: None,
3050            })
3051            .await
3052            .unwrap();
3053
3054        driver
3055            .submit_input(vec![Item {
3056                id: None,
3057                kind: ItemKind::User,
3058                parts: vec![Part::Text(TextPart {
3059                    text: "ping".into(),
3060                    metadata: MetadataMap::new(),
3061                })],
3062                metadata: MetadataMap::new(),
3063            }])
3064            .unwrap();
3065
3066        let result = driver.next().await.unwrap();
3067
3068        match result {
3069            LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
3070                let request = &pending.request;
3071                assert!(request.task_id.is_some());
3072                assert_eq!(request.provider, "mcp.mock");
3073                assert_eq!(request.challenge.get("scope"), Some(&json!("secret.read")));
3074                match &request.operation {
3075                    AuthOperation::ToolCall { tool_name, .. } => {
3076                        assert_eq!(tool_name, "auth-tool");
3077                    }
3078                    other => panic!("unexpected auth operation: {other:?}"),
3079                }
3080            }
3081            other => panic!("unexpected loop step: {other:?}"),
3082        }
3083    }
3084
3085    #[tokio::test]
3086    async fn async_task_manager_background_round_requires_explicit_continue() {
3087        let entered = StdArc::new(AtomicBool::new(false));
3088        let release = StdArc::new(Notify::new());
3089        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3090            "background-wait",
3091            RoutingDecision::Background,
3092        )]));
3093        let handle = task_manager.handle();
3094        let tools = ToolRegistry::new().with(BlockingTool::new(
3095            "background-wait",
3096            entered.clone(),
3097            release.clone(),
3098            "background-done",
3099        ));
3100        let agent = Agent::builder()
3101            .model(FakeAdapter)
3102            .tools(tools)
3103            .permissions(AllowAllPermissions)
3104            .task_manager(task_manager)
3105            .build()
3106            .unwrap();
3107
3108        let mut driver = agent
3109            .start(SessionConfig {
3110                session_id: SessionId::new("session-background"),
3111                metadata: MetadataMap::new(),
3112                cache: None,
3113            })
3114            .await
3115            .unwrap();
3116
3117        driver
3118            .submit_input(vec![Item {
3119                id: None,
3120                kind: ItemKind::User,
3121                parts: vec![Part::Text(TextPart {
3122                    text: "ping".into(),
3123                    metadata: MetadataMap::new(),
3124                })],
3125                metadata: MetadataMap::new(),
3126            }])
3127            .unwrap();
3128
3129        let first = driver.next().await.unwrap();
3130        match first {
3131            LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3132            other => panic!("unexpected first loop step: {other:?}"),
3133        }
3134
3135        match wait_for_task_event(&handle).await {
3136            TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3137            other => panic!("unexpected task event: {other:?}"),
3138        }
3139        wait_until_entered(entered.as_ref()).await;
3140        release.notify_waiters();
3141
3142        match wait_for_task_event(&handle).await {
3143            TaskEvent::Completed(_, result) => {
3144                assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3145            }
3146            other => panic!("unexpected completion event: {other:?}"),
3147        }
3148
3149        let resumed = driver.next().await.unwrap();
3150        match resumed {
3151            LoopStep::Finished(turn) => {
3152                assert_eq!(turn.finish_reason, FinishReason::Completed);
3153                match &turn.items[0].parts[0] {
3154                    Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3155                    other => panic!("unexpected part after resume: {other:?}"),
3156                }
3157            }
3158            other => panic!("unexpected resumed step: {other:?}"),
3159        }
3160    }
3161
3162    #[tokio::test]
3163    async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3164        let controller = CancellationController::new();
3165        let agent = Agent::builder()
3166            .model(SlowAdapter)
3167            .cancellation(controller.handle())
3168            .build()
3169            .unwrap();
3170
3171        let mut driver = agent
3172            .start(SessionConfig {
3173                session_id: SessionId::new("session-cancel"),
3174                metadata: MetadataMap::new(),
3175                cache: None,
3176            })
3177            .await
3178            .unwrap();
3179
3180        driver
3181            .submit_input(vec![Item {
3182                id: None,
3183                kind: ItemKind::User,
3184                parts: vec![Part::Text(TextPart {
3185                    text: "do the long task".into(),
3186                    metadata: MetadataMap::new(),
3187                })],
3188                metadata: MetadataMap::new(),
3189            }])
3190            .unwrap();
3191
3192        let cancelled = tokio::join!(async { driver.next().await }, async {
3193            tokio::task::yield_now().await;
3194            controller.interrupt();
3195        })
3196        .0
3197        .unwrap();
3198
3199        match cancelled {
3200            LoopStep::Finished(turn) => {
3201                assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3202                assert_eq!(turn.items.len(), 1);
3203                assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3204                assert_eq!(
3205                    turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3206                    Some(&Value::Bool(true))
3207                );
3208            }
3209            other => panic!("unexpected loop step: {other:?}"),
3210        }
3211
3212        driver
3213            .submit_input(vec![Item {
3214                id: None,
3215                kind: ItemKind::User,
3216                parts: vec![Part::Text(TextPart {
3217                    text: "try again".into(),
3218                    metadata: MetadataMap::new(),
3219                })],
3220                metadata: MetadataMap::new(),
3221            }])
3222            .unwrap();
3223
3224        let result = driver.next().await.unwrap();
3225        match result {
3226            LoopStep::Finished(turn) => {
3227                assert_eq!(turn.finish_reason, FinishReason::Completed);
3228            }
3229            other => panic!("unexpected loop step after retry: {other:?}"),
3230        }
3231    }
3232
3233    #[tokio::test]
3234    async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3235        let controller = CancellationController::new();
3236        let fg_entered = StdArc::new(AtomicBool::new(false));
3237        let fg_release = StdArc::new(Notify::new());
3238        let bg_entered = StdArc::new(AtomicBool::new(false));
3239        let bg_release = StdArc::new(Notify::new());
3240        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3241            ("foreground-wait", RoutingDecision::Foreground),
3242            ("background-wait", RoutingDecision::Background),
3243        ]));
3244        let handle = task_manager.handle();
3245        let tools = ToolRegistry::new()
3246            .with(BlockingTool::new(
3247                "foreground-wait",
3248                fg_entered.clone(),
3249                fg_release,
3250                "foreground-done",
3251            ))
3252            .with(BlockingTool::new(
3253                "background-wait",
3254                bg_entered.clone(),
3255                bg_release.clone(),
3256                "background-done",
3257            ));
3258        let agent = Agent::builder()
3259            .model(MultiToolAdapter)
3260            .tools(tools)
3261            .permissions(AllowAllPermissions)
3262            .cancellation(controller.handle())
3263            .task_manager(task_manager)
3264            .build()
3265            .unwrap();
3266
3267        let mut driver = agent
3268            .start(SessionConfig {
3269                session_id: SessionId::new("session-mixed-cancel"),
3270                metadata: MetadataMap::new(),
3271                cache: None,
3272            })
3273            .await
3274            .unwrap();
3275
3276        driver
3277            .submit_input(vec![Item {
3278                id: None,
3279                kind: ItemKind::User,
3280                parts: vec![Part::Text(TextPart {
3281                    text: "run both".into(),
3282                    metadata: MetadataMap::new(),
3283                })],
3284                metadata: MetadataMap::new(),
3285            }])
3286            .unwrap();
3287
3288        let cancelled = tokio::join!(async { driver.next().await }, async {
3289            let _ = wait_for_task_event(&handle).await;
3290            let _ = wait_for_task_event(&handle).await;
3291            wait_until_entered(fg_entered.as_ref()).await;
3292            wait_until_entered(bg_entered.as_ref()).await;
3293            controller.interrupt();
3294        })
3295        .0
3296        .unwrap();
3297
3298        match cancelled {
3299            LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3300            other => panic!("unexpected loop step after interrupt: {other:?}"),
3301        }
3302
3303        match wait_for_task_event(&handle).await {
3304            TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3305            other => panic!("unexpected post-interrupt event: {other:?}"),
3306        }
3307
3308        let running = handle.list_running().await;
3309        assert_eq!(running.len(), 1);
3310        assert_eq!(running[0].tool_name, "background-wait");
3311
3312        bg_release.notify_waiters();
3313        match wait_for_task_event(&handle).await {
3314            TaskEvent::Completed(snapshot, result) => {
3315                assert_eq!(snapshot.tool_name, "background-wait");
3316                assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3317            }
3318            other => panic!("unexpected background completion event: {other:?}"),
3319        }
3320    }
3321
3322    #[tokio::test]
3323    async fn loop_resumes_after_approved_tool_request() {
3324        let tools = ToolRegistry::new().with(EchoTool::default());
3325        let agent = Agent::builder()
3326            .model(FakeAdapter)
3327            .tools(tools)
3328            .permissions(ApproveFsReads)
3329            .build()
3330            .unwrap();
3331
3332        let mut driver = agent
3333            .start(SessionConfig {
3334                session_id: SessionId::new("session-approval"),
3335                metadata: MetadataMap::new(),
3336                cache: None,
3337            })
3338            .await
3339            .unwrap();
3340
3341        driver
3342            .submit_input(vec![Item {
3343                id: None,
3344                kind: ItemKind::User,
3345                parts: vec![Part::Text(TextPart {
3346                    text: "ping".into(),
3347                    metadata: MetadataMap::new(),
3348                })],
3349                metadata: MetadataMap::new(),
3350            }])
3351            .unwrap();
3352
3353        let first = driver.next().await.unwrap();
3354        match first {
3355            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3356                assert!(pending.request.task_id.is_some());
3357                assert_eq!(pending.request.id.0, "approval:fs-read");
3358                pending.approve(&mut driver).unwrap();
3359            }
3360            other => panic!("unexpected loop step: {other:?}"),
3361        }
3362        let second = driver.next().await.unwrap();
3363        match second {
3364            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3365                Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3366                other => panic!("unexpected part: {other:?}"),
3367            },
3368            other => panic!("unexpected loop step after approval: {other:?}"),
3369        }
3370    }
3371
3372    #[tokio::test]
3373    async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3374        let tools = ToolRegistry::new().with(EchoTool::default());
3375        let agent = Agent::builder()
3376            .model(DualApprovalAdapter)
3377            .tools(tools)
3378            .permissions(ApproveFsReads)
3379            .build()
3380            .unwrap();
3381
3382        let mut driver = agent
3383            .start(SessionConfig {
3384                session_id: SessionId::new("session-dual-approval"),
3385                metadata: MetadataMap::new(),
3386                cache: None,
3387            })
3388            .await
3389            .unwrap();
3390
3391        driver
3392            .submit_input(vec![Item {
3393                id: None,
3394                kind: ItemKind::User,
3395                parts: vec![Part::Text(TextPart {
3396                    text: "run both approvals".into(),
3397                    metadata: MetadataMap::new(),
3398                })],
3399                metadata: MetadataMap::new(),
3400            }])
3401            .unwrap();
3402
3403        let pending_first = match driver.next().await.unwrap() {
3404            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3405                assert_eq!(
3406                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3407                    Some("call-1")
3408                );
3409                pending
3410            }
3411            other => panic!("unexpected first loop step: {other:?}"),
3412        };
3413
3414        let pending_second = match driver.next().await.unwrap() {
3415            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3416                assert_eq!(
3417                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3418                    Some("call-2")
3419                );
3420                pending
3421            }
3422            other => panic!("unexpected second loop step: {other:?}"),
3423        };
3424
3425        pending_second.approve(&mut driver).unwrap();
3426        match driver.next().await.unwrap() {
3427            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3428                assert_eq!(
3429                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3430                    Some("call-1")
3431                );
3432            }
3433            other => panic!("unexpected step after approving second request: {other:?}"),
3434        }
3435
3436        pending_first.approve(&mut driver).unwrap();
3437        match driver.next().await.unwrap() {
3438            LoopStep::Finished(turn) => {
3439                assert_eq!(turn.finish_reason, FinishReason::Completed);
3440                match &turn.items[0].parts[0] {
3441                    Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3442                    other => panic!("unexpected final part: {other:?}"),
3443                }
3444            }
3445            other => panic!("unexpected final loop step: {other:?}"),
3446        }
3447    }
3448
3449    #[tokio::test]
3450    async fn loop_compacts_transcript_before_new_turns() {
3451        let events = StdArc::new(StdMutex::new(Vec::new()));
3452        let agent = Agent::builder()
3453            .model(FakeAdapter)
3454            .compaction(CompactionConfig::new(
3455                CountTrigger,
3456                CompactionPipeline::new().with_strategy(KeepRecentStrategy::new(1)),
3457            ))
3458            .observer(RecordingObserver {
3459                events: events.clone(),
3460            })
3461            .build()
3462            .unwrap();
3463
3464        let mut driver = agent
3465            .start(SessionConfig {
3466                session_id: SessionId::new("session-4"),
3467                metadata: MetadataMap::new(),
3468                cache: None,
3469            })
3470            .await
3471            .unwrap();
3472
3473        for text in ["first", "second"] {
3474            driver
3475                .submit_input(vec![Item {
3476                    id: None,
3477                    kind: ItemKind::User,
3478                    parts: vec![Part::Text(TextPart {
3479                        text: text.into(),
3480                        metadata: MetadataMap::new(),
3481                    })],
3482                    metadata: MetadataMap::new(),
3483                }])
3484                .unwrap();
3485            let _ = driver.next().await.unwrap();
3486        }
3487
3488        let events = events.lock().unwrap();
3489        assert!(events.iter().any(|event| matches!(
3490            event,
3491            AgentEvent::CompactionFinished {
3492                replaced_items,
3493                ..
3494            } if *replaced_items > 0
3495        )));
3496    }
3497
3498    #[tokio::test]
3499    async fn loop_refreshes_tool_specs_each_turn() {
3500        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3501        let version = StdArc::new(AtomicUsize::new(1));
3502        let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
3503        let agent = Agent::builder()
3504            .model(RecordingAdapter {
3505                seen_descriptions: seen_descriptions.clone(),
3506                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3507            })
3508            .tools(tools)
3509            .permissions(AllowAllPermissions)
3510            .build()
3511            .unwrap();
3512
3513        let mut driver = agent
3514            .start(SessionConfig {
3515                session_id: SessionId::new("session-dynamic-tools"),
3516                metadata: MetadataMap::new(),
3517                cache: None,
3518            })
3519            .await
3520            .unwrap();
3521
3522        for text in ["first", "second"] {
3523            driver
3524                .submit_input(vec![Item {
3525                    id: None,
3526                    kind: ItemKind::User,
3527                    parts: vec![Part::Text(TextPart {
3528                        text: text.into(),
3529                        metadata: MetadataMap::new(),
3530                    })],
3531                    metadata: MetadataMap::new(),
3532                }])
3533                .unwrap();
3534
3535            let _ = driver.next().await.unwrap();
3536            if text == "first" {
3537                version.store(2, Ordering::SeqCst);
3538            }
3539        }
3540
3541        let seen_descriptions = seen_descriptions.lock().unwrap();
3542        assert_eq!(seen_descriptions.len(), 2);
3543        assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
3544        assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
3545    }
3546
3547    #[tokio::test]
3548    async fn loop_passes_session_default_and_next_turn_cache_requests() {
3549        let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
3550        let agent = Agent::builder()
3551            .model(RecordingAdapter {
3552                seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
3553                seen_caches: seen_caches.clone(),
3554            })
3555            .permissions(AllowAllPermissions)
3556            .build()
3557            .unwrap();
3558
3559        let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
3560            .with_retention(PromptCacheRetention::Short);
3561        let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
3562            breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
3563        });
3564
3565        let mut driver = agent
3566            .start(SessionConfig {
3567                session_id: SessionId::new("session-cache"),
3568                metadata: MetadataMap::new(),
3569                cache: Some(default_cache.clone()),
3570            })
3571            .await
3572            .unwrap();
3573
3574        driver
3575            .submit_input(vec![Item {
3576                id: None,
3577                kind: ItemKind::User,
3578                parts: vec![Part::Text(TextPart {
3579                    text: "first".into(),
3580                    metadata: MetadataMap::new(),
3581                })],
3582                metadata: MetadataMap::new(),
3583            }])
3584            .unwrap();
3585        let _ = driver.next().await.unwrap();
3586
3587        driver
3588            .submit_input_with_cache(
3589                vec![Item {
3590                    id: None,
3591                    kind: ItemKind::User,
3592                    parts: vec![Part::Text(TextPart {
3593                        text: "second".into(),
3594                        metadata: MetadataMap::new(),
3595                    })],
3596                    metadata: MetadataMap::new(),
3597                }],
3598                override_cache.clone(),
3599            )
3600            .unwrap();
3601        let _ = driver.next().await.unwrap();
3602
3603        let seen = seen_caches.lock().unwrap();
3604        assert_eq!(seen.len(), 2);
3605        assert_eq!(seen[0], Some(default_cache));
3606        assert_eq!(seen[1], Some(override_cache));
3607    }
3608
3609    #[test]
3610    fn convenience_cache_builders_construct_expected_defaults() {
3611        let cache = PromptCacheRequest::automatic()
3612            .with_retention(PromptCacheRetention::Short)
3613            .with_key("workspace:demo");
3614        let session = SessionConfig::new("demo").with_cache(cache.clone());
3615
3616        assert_eq!(session.session_id, SessionId::new("demo"));
3617        assert_eq!(session.cache, Some(cache));
3618
3619        let explicit = PromptCacheRequest::explicit([
3620            PromptCacheBreakpoint::tools_end(),
3621            PromptCacheBreakpoint::transcript_item_end(2),
3622            PromptCacheBreakpoint::transcript_part_end(3, 1),
3623        ]);
3624
3625        assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
3626        assert_eq!(
3627            explicit.strategy,
3628            PromptCacheStrategy::Explicit {
3629                breakpoints: vec![
3630                    PromptCacheBreakpoint::ToolsEnd,
3631                    PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
3632                    PromptCacheBreakpoint::TranscriptPartEnd {
3633                        item_index: 3,
3634                        part_index: 1,
3635                    },
3636                ],
3637            }
3638        );
3639    }
3640}