agent_sdk/agent_loop.rs
1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Emits events throughout for real-time UI updates
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//! .provider(AnthropicProvider::sonnet(api_key))
26//! .tools(my_tools)
27//! .build();
28//! ```
29
30mod builder;
31mod helpers;
32mod idempotency;
33mod listen;
34mod llm;
35mod run_loop;
36#[cfg(test)]
37mod test_utils;
38#[cfg(test)]
39mod tests;
40mod tool_execution;
41mod turn;
42mod types;
43
44use self::run_loop::{run_loop, run_single_turn};
45use self::types::{RunLoopParameters, TurnParameters};
46
47pub use self::builder::AgentLoopBuilder;
48
49use crate::context::{CompactionConfig, ContextCompactor};
50use crate::events::{AgentEventEnvelope, SequenceCounter};
51use crate::hooks::AgentHooks;
52use crate::llm::LlmProvider;
53use crate::stores::{MessageStore, StateStore, ToolExecutionStore};
54use crate::tools::{ToolContext, ToolRegistry};
55use crate::types::{AgentConfig, AgentInput, AgentRunState, ThreadId, TurnOutcome};
56use std::sync::Arc;
57use tokio::sync::{mpsc, oneshot};
58
59/// The main agent loop that orchestrates LLM calls and tool execution.
60///
61/// `AgentLoop` is the core component that:
62/// - Manages conversation state via message and state stores
63/// - Calls the LLM provider and processes responses
64/// - Executes tools through the tool registry
65/// - Emits events for real-time updates via an async channel
66/// - Enforces hooks for tool permissions and lifecycle events
67///
68/// # Type Parameters
69///
70/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
71/// - `P`: The LLM provider implementation
72/// - `H`: The hooks implementation for lifecycle customization
73/// - `M`: The message store implementation
74/// - `S`: The state store implementation
75///
76/// # Event Channel Behavior
77///
78/// The agent uses a bounded channel (capacity 100) for events. Events are sent
79/// using non-blocking sends:
80///
81/// - If the channel has space, events are sent immediately
82/// - If the channel is full, the agent waits up to 30 seconds before timing out
83/// - If the receiver is dropped, the agent continues processing without blocking
84///
85/// This design ensures that slow consumers don't stall the LLM stream, but events
86/// may be dropped if the consumer is too slow or disconnects.
87///
88/// # Running the Agent
89///
90/// ```ignore
91/// let (mut events, final_state) = agent.run(
92/// thread_id,
93/// AgentInput::Text("Hello!".to_string()),
94/// tool_ctx,
95/// );
96/// while let Some(envelope) = events.recv().await {
97/// match envelope.event {
98/// AgentEvent::Text {
99/// message_id: _,
100/// text,
101/// } => println!("{}", text),
102/// AgentEvent::Done { .. } => break,
103/// _ => {}
104/// }
105/// }
106/// ```
107pub struct AgentLoop<Ctx, P, H, M, S>
108where
109 P: LlmProvider,
110 H: AgentHooks,
111 M: MessageStore,
112 S: StateStore,
113{
114 pub(super) provider: Arc<P>,
115 pub(super) tools: Arc<ToolRegistry<Ctx>>,
116 pub(super) hooks: Arc<H>,
117 pub(super) message_store: Arc<M>,
118 pub(super) state_store: Arc<S>,
119 pub(super) config: AgentConfig,
120 pub(super) compaction_config: Option<CompactionConfig>,
121 pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
122 pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
123}
124
125/// Create a new builder for constructing an `AgentLoop`.
126#[must_use]
127pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
128 AgentLoopBuilder::new()
129}
130
131impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
132where
133 Ctx: Send + Sync + 'static,
134 P: LlmProvider + 'static,
135 H: AgentHooks + 'static,
136 M: MessageStore + 'static,
137 S: StateStore + 'static,
138{
139 /// Create a new agent loop with all components specified directly.
140 #[must_use]
141 pub fn new(
142 provider: P,
143 tools: ToolRegistry<Ctx>,
144 hooks: H,
145 message_store: M,
146 state_store: S,
147 config: AgentConfig,
148 ) -> Self {
149 Self {
150 provider: Arc::new(provider),
151 tools: Arc::new(tools),
152 hooks: Arc::new(hooks),
153 message_store: Arc::new(message_store),
154 state_store: Arc::new(state_store),
155 config,
156 compaction_config: None,
157 compactor: None,
158 execution_store: None,
159 }
160 }
161
162 /// Create a new agent loop with compaction enabled.
163 #[must_use]
164 pub fn with_compaction(
165 provider: P,
166 tools: ToolRegistry<Ctx>,
167 hooks: H,
168 message_store: M,
169 state_store: S,
170 config: AgentConfig,
171 compaction_config: CompactionConfig,
172 ) -> Self {
173 Self {
174 provider: Arc::new(provider),
175 tools: Arc::new(tools),
176 hooks: Arc::new(hooks),
177 message_store: Arc::new(message_store),
178 state_store: Arc::new(state_store),
179 config,
180 compaction_config: Some(compaction_config),
181 compactor: None,
182 execution_store: None,
183 }
184 }
185
186 /// Run the agent loop.
187 ///
188 /// This method allows the agent to pause when a tool requires confirmation,
189 /// returning an `AgentRunState::AwaitingConfirmation` that contains the
190 /// state needed to resume.
191 ///
192 /// # Arguments
193 ///
194 /// * `thread_id` - The thread identifier for this conversation
195 /// * `input` - Either a new text message or a resume with confirmation decision
196 /// * `tool_context` - Context passed to tools
197 ///
198 /// # Returns
199 ///
200 /// A tuple of:
201 /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events
202 /// - `oneshot::Receiver<AgentRunState>` - Channel for the final state
203 ///
204 /// # Example
205 ///
206 /// ```ignore
207 /// let (events, final_state) = agent.run(
208 /// thread_id,
209 /// AgentInput::Text("Hello".to_string()),
210 /// tool_ctx,
211 /// );
212 ///
213 /// while let Some(envelope) = events.recv().await {
214 /// // Handle events...
215 /// }
216 ///
217 /// match final_state.await.unwrap() {
218 /// AgentRunState::Done { .. } => { /* completed */ }
219 /// AgentRunState::AwaitingConfirmation { continuation, .. } => {
220 /// // Get user decision, then resume:
221 /// let (events2, state2) = agent.run(
222 /// thread_id,
223 /// AgentInput::Resume {
224 /// continuation,
225 /// tool_call_id: id,
226 /// confirmed: true,
227 /// rejection_reason: None,
228 /// },
229 /// tool_ctx,
230 /// );
231 /// }
232 /// AgentRunState::Error(e) => { /* handle error */ }
233 /// }
234 /// ```
235 pub fn run(
236 &self,
237 thread_id: ThreadId,
238 input: AgentInput,
239 tool_context: ToolContext<Ctx>,
240 ) -> (
241 mpsc::Receiver<AgentEventEnvelope>,
242 oneshot::Receiver<AgentRunState>,
243 )
244 where
245 Ctx: Clone,
246 {
247 let (event_tx, event_rx) = mpsc::channel(100);
248 let (state_tx, state_rx) = oneshot::channel();
249 let seq = SequenceCounter::new();
250
251 let provider = Arc::clone(&self.provider);
252 let tools = Arc::clone(&self.tools);
253 let hooks = Arc::clone(&self.hooks);
254 let message_store = Arc::clone(&self.message_store);
255 let state_store = Arc::clone(&self.state_store);
256 let config = self.config.clone();
257 let compaction_config = self.compaction_config.clone();
258 let compactor = self.compactor.clone();
259 let execution_store = self.execution_store.clone();
260
261 tokio::spawn(async move {
262 let result = run_loop(RunLoopParameters {
263 tx: event_tx,
264 seq,
265 thread_id,
266 input,
267 tool_context,
268 provider,
269 tools,
270 hooks,
271 message_store,
272 state_store,
273 config,
274 compaction_config,
275 compactor,
276 execution_store,
277 })
278 .await;
279
280 let _ = state_tx.send(result);
281 });
282
283 (event_rx, state_rx)
284 }
285
286 /// Run a single turn of the agent loop.
287 ///
288 /// Unlike `run()`, this method executes exactly one turn and returns control
289 /// to the caller. This enables external orchestration where each turn can be
290 /// dispatched as a separate message (e.g., via Artemis or another message queue).
291 ///
292 /// # Arguments
293 ///
294 /// * `thread_id` - The thread identifier for this conversation
295 /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
296 /// * `tool_context` - Context passed to tools
297 ///
298 /// # Returns
299 ///
300 /// A tuple of:
301 /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events from this turn
302 /// - `TurnOutcome` - The turn's outcome
303 ///
304 /// # Turn Outcomes
305 ///
306 /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
307 /// - `Done` - Agent completed successfully
308 /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
309 /// - `Error` - An error occurred
310 ///
311 /// # Example
312 ///
313 /// ```ignore
314 /// // Start conversation
315 /// let (events, outcome) = agent.run_turn(
316 /// thread_id.clone(),
317 /// AgentInput::Text("What is 2+2?".to_string()),
318 /// tool_ctx.clone(),
319 /// ).await;
320 ///
321 /// // Process events...
322 /// while let Some(_envelope) = events.recv().await { /* ... */ }
323 ///
324 /// // Check outcome
325 /// match outcome {
326 /// TurnOutcome::NeedsMoreTurns { turn, .. } => {
327 /// // Dispatch another message to continue
328 /// // (e.g., schedule an Artemis message)
329 /// }
330 /// TurnOutcome::Done { .. } => {
331 /// // Conversation complete
332 /// }
333 /// TurnOutcome::AwaitingConfirmation { continuation, .. } => {
334 /// // Get user confirmation, then resume
335 /// }
336 /// TurnOutcome::Error(e) => {
337 /// // Handle error
338 /// }
339 /// }
340 /// ```
341 pub fn run_turn(
342 &self,
343 thread_id: ThreadId,
344 input: AgentInput,
345 tool_context: ToolContext<Ctx>,
346 ) -> (
347 mpsc::Receiver<AgentEventEnvelope>,
348 oneshot::Receiver<TurnOutcome>,
349 )
350 where
351 Ctx: Clone,
352 {
353 let (event_tx, event_rx) = mpsc::channel(100);
354 let (outcome_tx, outcome_rx) = oneshot::channel();
355 let seq = SequenceCounter::new();
356
357 let provider = Arc::clone(&self.provider);
358 let tools = Arc::clone(&self.tools);
359 let hooks = Arc::clone(&self.hooks);
360 let message_store = Arc::clone(&self.message_store);
361 let state_store = Arc::clone(&self.state_store);
362 let config = self.config.clone();
363 let compaction_config = self.compaction_config.clone();
364 let compactor = self.compactor.clone();
365 let execution_store = self.execution_store.clone();
366
367 tokio::spawn(async move {
368 let result = run_single_turn(TurnParameters {
369 tx: event_tx,
370 seq,
371 thread_id,
372 input,
373 tool_context,
374 provider,
375 tools,
376 hooks,
377 message_store,
378 state_store,
379 config,
380 compaction_config,
381 compactor,
382 execution_store,
383 })
384 .await;
385
386 let _ = outcome_tx.send(result);
387 });
388
389 (event_rx, outcome_rx)
390 }
391}