Skip to main content

agent_sdk/
agent_loop.rs

1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Emits events throughout for real-time UI updates
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//!     .provider(AnthropicProvider::sonnet(api_key))
26//!     .tools(my_tools)
27//!     .build();
28//! ```
29
30mod builder;
31mod helpers;
32mod idempotency;
33mod listen;
34mod llm;
35mod run_loop;
36#[cfg(test)]
37mod test_utils;
38#[cfg(test)]
39mod tests;
40mod tool_execution;
41mod turn;
42mod types;
43
44use self::run_loop::{run_loop, run_single_turn};
45use self::types::{RunLoopParameters, TurnParameters};
46
47pub use self::builder::AgentLoopBuilder;
48
49use crate::context::{CompactionConfig, ContextCompactor};
50use crate::events::{AgentEventEnvelope, SequenceCounter};
51use crate::hooks::AgentHooks;
52use crate::llm::LlmProvider;
53use crate::stores::{MessageStore, StateStore, ToolExecutionStore};
54use crate::tools::{ToolContext, ToolRegistry};
55use crate::types::{AgentConfig, AgentInput, AgentRunState, ThreadId, TurnOutcome};
56use std::sync::Arc;
57use tokio::sync::{mpsc, oneshot};
58use tokio_util::sync::CancellationToken;
59
60/// The main agent loop that orchestrates LLM calls and tool execution.
61///
62/// `AgentLoop` is the core component that:
63/// - Manages conversation state via message and state stores
64/// - Calls the LLM provider and processes responses
65/// - Executes tools through the tool registry
66/// - Emits events for real-time updates via an async channel
67/// - Enforces hooks for tool permissions and lifecycle events
68///
69/// # Type Parameters
70///
71/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
72/// - `P`: The LLM provider implementation
73/// - `H`: The hooks implementation for lifecycle customization
74/// - `M`: The message store implementation
75/// - `S`: The state store implementation
76///
77/// # Event Channel Behavior
78///
79/// The agent uses a bounded channel (capacity 100) for events. Events are sent
80/// using non-blocking sends:
81///
82/// - If the channel has space, events are sent immediately
83/// - If the channel is full, the agent waits up to 30 seconds before timing out
84/// - If the receiver is dropped, the agent continues processing without blocking
85///
86/// This design ensures that slow consumers don't stall the LLM stream, but events
87/// may be dropped if the consumer is too slow or disconnects.
88///
89/// # Running the Agent
90///
91/// ```ignore
92/// let (mut events, final_state) = agent.run(
93///     thread_id,
94///     AgentInput::Text("Hello!".to_string()),
95///     tool_ctx,
96/// );
97/// while let Some(envelope) = events.recv().await {
98///     match envelope.event {
99///         AgentEvent::Text {
100///             message_id: _,
101///             text,
102///         } => println!("{}", text),
103///         AgentEvent::Done { .. } => break,
104///         _ => {}
105///     }
106/// }
107/// ```
108pub struct AgentLoop<Ctx, P, H, M, S>
109where
110    P: LlmProvider,
111    H: AgentHooks,
112    M: MessageStore,
113    S: StateStore,
114{
115    pub(super) provider: Arc<P>,
116    pub(super) tools: Arc<ToolRegistry<Ctx>>,
117    pub(super) hooks: Arc<H>,
118    pub(super) message_store: Arc<M>,
119    pub(super) state_store: Arc<S>,
120    pub(super) config: AgentConfig,
121    pub(super) compaction_config: Option<CompactionConfig>,
122    pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
123    pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
124}
125
126/// Create a new builder for constructing an `AgentLoop`.
127#[must_use]
128pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
129    AgentLoopBuilder::new()
130}
131
132impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
133where
134    Ctx: Send + Sync + 'static,
135    P: LlmProvider + 'static,
136    H: AgentHooks + 'static,
137    M: MessageStore + 'static,
138    S: StateStore + 'static,
139{
140    /// Create a new agent loop with all components specified directly.
141    #[must_use]
142    pub fn new(
143        provider: P,
144        tools: ToolRegistry<Ctx>,
145        hooks: H,
146        message_store: M,
147        state_store: S,
148        config: AgentConfig,
149    ) -> Self {
150        Self {
151            provider: Arc::new(provider),
152            tools: Arc::new(tools),
153            hooks: Arc::new(hooks),
154            message_store: Arc::new(message_store),
155            state_store: Arc::new(state_store),
156            config,
157            compaction_config: None,
158            compactor: None,
159            execution_store: None,
160        }
161    }
162
163    /// Create a new agent loop with compaction enabled.
164    #[must_use]
165    pub fn with_compaction(
166        provider: P,
167        tools: ToolRegistry<Ctx>,
168        hooks: H,
169        message_store: M,
170        state_store: S,
171        config: AgentConfig,
172        compaction_config: CompactionConfig,
173    ) -> Self {
174        Self {
175            provider: Arc::new(provider),
176            tools: Arc::new(tools),
177            hooks: Arc::new(hooks),
178            message_store: Arc::new(message_store),
179            state_store: Arc::new(state_store),
180            config,
181            compaction_config: Some(compaction_config),
182            compactor: None,
183            execution_store: None,
184        }
185    }
186
187    /// Run the agent loop.
188    ///
189    /// This method allows the agent to pause when a tool requires confirmation,
190    /// returning an `AgentRunState::AwaitingConfirmation` that contains the
191    /// state needed to resume.
192    ///
193    /// When the `cancel_token` is cancelled, the agent will stop after the
194    /// current turn completes (no new turns will start). The final state will
195    /// be `AgentRunState::Cancelled`.
196    ///
197    /// # Arguments
198    ///
199    /// * `thread_id` - The thread identifier for this conversation
200    /// * `input` - Either a new text message or a resume with confirmation decision
201    /// * `tool_context` - Context passed to tools
202    /// * `cancel_token` - Token to signal cancellation from outside
203    ///
204    /// # Returns
205    ///
206    /// A tuple of:
207    /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events
208    /// - `oneshot::Receiver<AgentRunState>` - Channel for the final state
209    ///
210    /// # Example
211    ///
212    /// ```ignore
213    /// let cancel = CancellationToken::new();
214    /// let (events, final_state) = agent.run(
215    ///     thread_id,
216    ///     AgentInput::Text("Hello".to_string()),
217    ///     tool_ctx,
218    ///     cancel.clone(),
219    /// );
220    ///
221    /// while let Some(envelope) = events.recv().await {
222    ///     // Handle events...
223    /// }
224    ///
225    /// match final_state.await.unwrap() {
226    ///     AgentRunState::Done { .. } => { /* completed */ }
227    ///     AgentRunState::Cancelled { .. } => { /* user cancelled */ }
228    ///     AgentRunState::AwaitingConfirmation { continuation, .. } => {
229    ///         // Get user decision, then resume:
230    ///         let (events2, state2) = agent.run(
231    ///             thread_id,
232    ///             AgentInput::Resume {
233    ///                 continuation,
234    ///                 tool_call_id: id,
235    ///                 confirmed: true,
236    ///                 rejection_reason: None,
237    ///             },
238    ///             tool_ctx,
239    ///             cancel.clone(),
240    ///         );
241    ///     }
242    ///     AgentRunState::Error(e) => { /* handle error */ }
243    /// }
244    /// ```
245    pub fn run(
246        &self,
247        thread_id: ThreadId,
248        input: AgentInput,
249        tool_context: ToolContext<Ctx>,
250        cancel_token: CancellationToken,
251    ) -> (
252        mpsc::Receiver<AgentEventEnvelope>,
253        oneshot::Receiver<AgentRunState>,
254    )
255    where
256        Ctx: Clone,
257    {
258        let (event_tx, event_rx) = mpsc::channel(100);
259        let (state_tx, state_rx) = oneshot::channel();
260        let seq = SequenceCounter::new();
261
262        let provider = Arc::clone(&self.provider);
263        let tools = Arc::clone(&self.tools);
264        let hooks = Arc::clone(&self.hooks);
265        let message_store = Arc::clone(&self.message_store);
266        let state_store = Arc::clone(&self.state_store);
267        let config = self.config.clone();
268        let compaction_config = self.compaction_config.clone();
269        let compactor = self.compactor.clone();
270        let execution_store = self.execution_store.clone();
271
272        tokio::spawn(async move {
273            let result = run_loop(RunLoopParameters {
274                tx: event_tx,
275                seq,
276                thread_id,
277                input,
278                tool_context,
279                provider,
280                tools,
281                hooks,
282                message_store,
283                state_store,
284                config,
285                compaction_config,
286                compactor,
287                execution_store,
288                cancel_token,
289            })
290            .await;
291
292            let _ = state_tx.send(result);
293        });
294
295        (event_rx, state_rx)
296    }
297
298    /// Run a single turn of the agent loop.
299    ///
300    /// Unlike `run()`, this method executes exactly one turn and returns control
301    /// to the caller. This enables external orchestration where each turn can be
302    /// dispatched as a separate message (e.g., via Artemis or another message queue).
303    ///
304    /// When the `cancel_token` is cancelled, the turn will be aborted before
305    /// starting execution and return `TurnOutcome::Cancelled`.
306    ///
307    /// # Arguments
308    ///
309    /// * `thread_id` - The thread identifier for this conversation
310    /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
311    /// * `tool_context` - Context passed to tools
312    /// * `cancel_token` - Token to signal cancellation from outside
313    ///
314    /// # Returns
315    ///
316    /// A tuple of:
317    /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events from this turn
318    /// - `TurnOutcome` - The turn's outcome
319    ///
320    /// # Turn Outcomes
321    ///
322    /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
323    /// - `Done` - Agent completed successfully
324    /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
325    /// - `Cancelled` - Turn was cancelled via the token
326    /// - `Error` - An error occurred
327    ///
328    /// # Example
329    ///
330    /// ```ignore
331    /// let cancel = CancellationToken::new();
332    /// // Start conversation
333    /// let (events, outcome) = agent.run_turn(
334    ///     thread_id.clone(),
335    ///     AgentInput::Text("What is 2+2?".to_string()),
336    ///     tool_ctx.clone(),
337    ///     cancel,
338    /// ).await;
339    ///
340    /// // Process events...
341    /// while let Some(_envelope) = events.recv().await { /* ... */ }
342    ///
343    /// // Check outcome
344    /// match outcome {
345    ///     TurnOutcome::NeedsMoreTurns { turn, .. } => {
346    ///         // Dispatch another message to continue
347    ///         // (e.g., schedule an Artemis message)
348    ///     }
349    ///     TurnOutcome::Done { .. } => {
350    ///         // Conversation complete
351    ///     }
352    ///     TurnOutcome::AwaitingConfirmation { continuation, .. } => {
353    ///         // Get user confirmation, then resume
354    ///     }
355    ///     TurnOutcome::Error(e) => {
356    ///         // Handle error
357    ///     }
358    /// }
359    /// ```
360    pub fn run_turn(
361        &self,
362        thread_id: ThreadId,
363        input: AgentInput,
364        tool_context: ToolContext<Ctx>,
365        cancel_token: CancellationToken,
366    ) -> (
367        mpsc::Receiver<AgentEventEnvelope>,
368        oneshot::Receiver<TurnOutcome>,
369    )
370    where
371        Ctx: Clone,
372    {
373        let (event_tx, event_rx) = mpsc::channel(100);
374        let (outcome_tx, outcome_rx) = oneshot::channel();
375        let seq = SequenceCounter::new();
376
377        let provider = Arc::clone(&self.provider);
378        let tools = Arc::clone(&self.tools);
379        let hooks = Arc::clone(&self.hooks);
380        let message_store = Arc::clone(&self.message_store);
381        let state_store = Arc::clone(&self.state_store);
382        let config = self.config.clone();
383        let compaction_config = self.compaction_config.clone();
384        let compactor = self.compactor.clone();
385        let execution_store = self.execution_store.clone();
386
387        tokio::spawn(async move {
388            let result = run_single_turn(TurnParameters {
389                tx: event_tx,
390                seq,
391                thread_id,
392                input,
393                tool_context,
394                provider,
395                tools,
396                hooks,
397                message_store,
398                state_store,
399                config,
400                compaction_config,
401                compactor,
402                execution_store,
403                cancel_token,
404            })
405            .await;
406
407            let _ = outcome_tx.send(result);
408        });
409
410        (event_rx, outcome_rx)
411    }
412}