Skip to main content

agent_sdk/
agent_loop.rs

1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Emits events throughout for real-time UI updates
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//!     .provider(AnthropicProvider::sonnet(api_key))
26//!     .tools(my_tools)
27//!     .build();
28//! ```
29
30mod builder;
31mod helpers;
32mod idempotency;
33mod listen;
34mod llm;
35mod run_loop;
36#[cfg(test)]
37mod test_utils;
38#[cfg(test)]
39mod tests;
40mod tool_execution;
41mod turn;
42mod types;
43
44use self::run_loop::{run_loop, run_single_turn};
45use self::types::{RunLoopParameters, TurnParameters};
46
47pub use self::builder::AgentLoopBuilder;
48
49use crate::context::{CompactionConfig, ContextCompactor};
50use crate::events::{AgentEventEnvelope, SequenceCounter};
51use crate::hooks::AgentHooks;
52use crate::llm::LlmProvider;
53use crate::stores::{MessageStore, StateStore, ToolExecutionStore};
54use crate::tools::{ToolContext, ToolRegistry};
55use crate::types::{AgentConfig, AgentInput, AgentRunState, ThreadId, TurnOutcome};
56use std::sync::Arc;
57use tokio::sync::{mpsc, oneshot};
58
59/// The main agent loop that orchestrates LLM calls and tool execution.
60///
61/// `AgentLoop` is the core component that:
62/// - Manages conversation state via message and state stores
63/// - Calls the LLM provider and processes responses
64/// - Executes tools through the tool registry
65/// - Emits events for real-time updates via an async channel
66/// - Enforces hooks for tool permissions and lifecycle events
67///
68/// # Type Parameters
69///
70/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
71/// - `P`: The LLM provider implementation
72/// - `H`: The hooks implementation for lifecycle customization
73/// - `M`: The message store implementation
74/// - `S`: The state store implementation
75///
76/// # Event Channel Behavior
77///
78/// The agent uses a bounded channel (capacity 100) for events. Events are sent
79/// using non-blocking sends:
80///
81/// - If the channel has space, events are sent immediately
82/// - If the channel is full, the agent waits up to 30 seconds before timing out
83/// - If the receiver is dropped, the agent continues processing without blocking
84///
85/// This design ensures that slow consumers don't stall the LLM stream, but events
86/// may be dropped if the consumer is too slow or disconnects.
87///
88/// # Running the Agent
89///
90/// ```ignore
91/// let (mut events, final_state) = agent.run(
92///     thread_id,
93///     AgentInput::Text("Hello!".to_string()),
94///     tool_ctx,
95/// );
96/// while let Some(envelope) = events.recv().await {
97///     match envelope.event {
98///         AgentEvent::Text {
99///             message_id: _,
100///             text,
101///         } => println!("{}", text),
102///         AgentEvent::Done { .. } => break,
103///         _ => {}
104///     }
105/// }
106/// ```
107pub struct AgentLoop<Ctx, P, H, M, S>
108where
109    P: LlmProvider,
110    H: AgentHooks,
111    M: MessageStore,
112    S: StateStore,
113{
114    pub(super) provider: Arc<P>,
115    pub(super) tools: Arc<ToolRegistry<Ctx>>,
116    pub(super) hooks: Arc<H>,
117    pub(super) message_store: Arc<M>,
118    pub(super) state_store: Arc<S>,
119    pub(super) config: AgentConfig,
120    pub(super) compaction_config: Option<CompactionConfig>,
121    pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
122    pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
123}
124
125/// Create a new builder for constructing an `AgentLoop`.
126#[must_use]
127pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
128    AgentLoopBuilder::new()
129}
130
131impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
132where
133    Ctx: Send + Sync + 'static,
134    P: LlmProvider + 'static,
135    H: AgentHooks + 'static,
136    M: MessageStore + 'static,
137    S: StateStore + 'static,
138{
139    /// Create a new agent loop with all components specified directly.
140    #[must_use]
141    pub fn new(
142        provider: P,
143        tools: ToolRegistry<Ctx>,
144        hooks: H,
145        message_store: M,
146        state_store: S,
147        config: AgentConfig,
148    ) -> Self {
149        Self {
150            provider: Arc::new(provider),
151            tools: Arc::new(tools),
152            hooks: Arc::new(hooks),
153            message_store: Arc::new(message_store),
154            state_store: Arc::new(state_store),
155            config,
156            compaction_config: None,
157            compactor: None,
158            execution_store: None,
159        }
160    }
161
162    /// Create a new agent loop with compaction enabled.
163    #[must_use]
164    pub fn with_compaction(
165        provider: P,
166        tools: ToolRegistry<Ctx>,
167        hooks: H,
168        message_store: M,
169        state_store: S,
170        config: AgentConfig,
171        compaction_config: CompactionConfig,
172    ) -> Self {
173        Self {
174            provider: Arc::new(provider),
175            tools: Arc::new(tools),
176            hooks: Arc::new(hooks),
177            message_store: Arc::new(message_store),
178            state_store: Arc::new(state_store),
179            config,
180            compaction_config: Some(compaction_config),
181            compactor: None,
182            execution_store: None,
183        }
184    }
185
186    /// Run the agent loop.
187    ///
188    /// This method allows the agent to pause when a tool requires confirmation,
189    /// returning an `AgentRunState::AwaitingConfirmation` that contains the
190    /// state needed to resume.
191    ///
192    /// # Arguments
193    ///
194    /// * `thread_id` - The thread identifier for this conversation
195    /// * `input` - Either a new text message or a resume with confirmation decision
196    /// * `tool_context` - Context passed to tools
197    ///
198    /// # Returns
199    ///
200    /// A tuple of:
201    /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events
202    /// - `oneshot::Receiver<AgentRunState>` - Channel for the final state
203    ///
204    /// # Example
205    ///
206    /// ```ignore
207    /// let (events, final_state) = agent.run(
208    ///     thread_id,
209    ///     AgentInput::Text("Hello".to_string()),
210    ///     tool_ctx,
211    /// );
212    ///
213    /// while let Some(envelope) = events.recv().await {
214    ///     // Handle events...
215    /// }
216    ///
217    /// match final_state.await.unwrap() {
218    ///     AgentRunState::Done { .. } => { /* completed */ }
219    ///     AgentRunState::AwaitingConfirmation { continuation, .. } => {
220    ///         // Get user decision, then resume:
221    ///         let (events2, state2) = agent.run(
222    ///             thread_id,
223    ///             AgentInput::Resume {
224    ///                 continuation,
225    ///                 tool_call_id: id,
226    ///                 confirmed: true,
227    ///                 rejection_reason: None,
228    ///             },
229    ///             tool_ctx,
230    ///         );
231    ///     }
232    ///     AgentRunState::Error(e) => { /* handle error */ }
233    /// }
234    /// ```
235    pub fn run(
236        &self,
237        thread_id: ThreadId,
238        input: AgentInput,
239        tool_context: ToolContext<Ctx>,
240    ) -> (
241        mpsc::Receiver<AgentEventEnvelope>,
242        oneshot::Receiver<AgentRunState>,
243    )
244    where
245        Ctx: Clone,
246    {
247        let (event_tx, event_rx) = mpsc::channel(100);
248        let (state_tx, state_rx) = oneshot::channel();
249        let seq = SequenceCounter::new();
250
251        let provider = Arc::clone(&self.provider);
252        let tools = Arc::clone(&self.tools);
253        let hooks = Arc::clone(&self.hooks);
254        let message_store = Arc::clone(&self.message_store);
255        let state_store = Arc::clone(&self.state_store);
256        let config = self.config.clone();
257        let compaction_config = self.compaction_config.clone();
258        let compactor = self.compactor.clone();
259        let execution_store = self.execution_store.clone();
260
261        tokio::spawn(async move {
262            let result = run_loop(RunLoopParameters {
263                tx: event_tx,
264                seq,
265                thread_id,
266                input,
267                tool_context,
268                provider,
269                tools,
270                hooks,
271                message_store,
272                state_store,
273                config,
274                compaction_config,
275                compactor,
276                execution_store,
277            })
278            .await;
279
280            let _ = state_tx.send(result);
281        });
282
283        (event_rx, state_rx)
284    }
285
286    /// Run a single turn of the agent loop.
287    ///
288    /// Unlike `run()`, this method executes exactly one turn and returns control
289    /// to the caller. This enables external orchestration where each turn can be
290    /// dispatched as a separate message (e.g., via Artemis or another message queue).
291    ///
292    /// # Arguments
293    ///
294    /// * `thread_id` - The thread identifier for this conversation
295    /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
296    /// * `tool_context` - Context passed to tools
297    ///
298    /// # Returns
299    ///
300    /// A tuple of:
301    /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events from this turn
302    /// - `TurnOutcome` - The turn's outcome
303    ///
304    /// # Turn Outcomes
305    ///
306    /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
307    /// - `Done` - Agent completed successfully
308    /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
309    /// - `Error` - An error occurred
310    ///
311    /// # Example
312    ///
313    /// ```ignore
314    /// // Start conversation
315    /// let (events, outcome) = agent.run_turn(
316    ///     thread_id.clone(),
317    ///     AgentInput::Text("What is 2+2?".to_string()),
318    ///     tool_ctx.clone(),
319    /// ).await;
320    ///
321    /// // Process events...
322    /// while let Some(_envelope) = events.recv().await { /* ... */ }
323    ///
324    /// // Check outcome
325    /// match outcome {
326    ///     TurnOutcome::NeedsMoreTurns { turn, .. } => {
327    ///         // Dispatch another message to continue
328    ///         // (e.g., schedule an Artemis message)
329    ///     }
330    ///     TurnOutcome::Done { .. } => {
331    ///         // Conversation complete
332    ///     }
333    ///     TurnOutcome::AwaitingConfirmation { continuation, .. } => {
334    ///         // Get user confirmation, then resume
335    ///     }
336    ///     TurnOutcome::Error(e) => {
337    ///         // Handle error
338    ///     }
339    /// }
340    /// ```
341    pub fn run_turn(
342        &self,
343        thread_id: ThreadId,
344        input: AgentInput,
345        tool_context: ToolContext<Ctx>,
346    ) -> (
347        mpsc::Receiver<AgentEventEnvelope>,
348        oneshot::Receiver<TurnOutcome>,
349    )
350    where
351        Ctx: Clone,
352    {
353        let (event_tx, event_rx) = mpsc::channel(100);
354        let (outcome_tx, outcome_rx) = oneshot::channel();
355        let seq = SequenceCounter::new();
356
357        let provider = Arc::clone(&self.provider);
358        let tools = Arc::clone(&self.tools);
359        let hooks = Arc::clone(&self.hooks);
360        let message_store = Arc::clone(&self.message_store);
361        let state_store = Arc::clone(&self.state_store);
362        let config = self.config.clone();
363        let compaction_config = self.compaction_config.clone();
364        let compactor = self.compactor.clone();
365        let execution_store = self.execution_store.clone();
366
367        tokio::spawn(async move {
368            let result = run_single_turn(TurnParameters {
369                tx: event_tx,
370                seq,
371                thread_id,
372                input,
373                tool_context,
374                provider,
375                tools,
376                hooks,
377                message_store,
378                state_store,
379                config,
380                compaction_config,
381                compactor,
382                execution_store,
383            })
384            .await;
385
386            let _ = outcome_tx.send(result);
387        });
388
389        (event_rx, outcome_rx)
390    }
391}