agent_sdk/agent_loop.rs
1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Emits events throughout for real-time UI updates
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//! .provider(AnthropicProvider::sonnet(api_key))
26//! .tools(my_tools)
27//! .build();
28//! ```
29
30mod builder;
31mod helpers;
32mod idempotency;
33mod listen;
34mod llm;
35mod run_loop;
36#[cfg(test)]
37mod test_utils;
38#[cfg(test)]
39mod tests;
40mod tool_execution;
41mod turn;
42mod types;
43
44use self::run_loop::{run_loop, run_single_turn};
45use self::types::{RunLoopParameters, TurnParameters};
46
47pub use self::builder::AgentLoopBuilder;
48
49use crate::context::{CompactionConfig, ContextCompactor};
50use crate::events::{AgentEventEnvelope, SequenceCounter};
51use crate::hooks::AgentHooks;
52use crate::llm::LlmProvider;
53use crate::stores::{MessageStore, StateStore, ToolExecutionStore};
54use crate::tools::{ToolContext, ToolRegistry};
55use crate::types::{AgentConfig, AgentInput, AgentRunState, ThreadId, TurnOutcome};
56use std::sync::Arc;
57use tokio::sync::{mpsc, oneshot};
58use tokio_util::sync::CancellationToken;
59
60/// The main agent loop that orchestrates LLM calls and tool execution.
61///
62/// `AgentLoop` is the core component that:
63/// - Manages conversation state via message and state stores
64/// - Calls the LLM provider and processes responses
65/// - Executes tools through the tool registry
66/// - Emits events for real-time updates via an async channel
67/// - Enforces hooks for tool permissions and lifecycle events
68///
69/// # Type Parameters
70///
71/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
72/// - `P`: The LLM provider implementation
73/// - `H`: The hooks implementation for lifecycle customization
74/// - `M`: The message store implementation
75/// - `S`: The state store implementation
76///
77/// # Event Channel Behavior
78///
79/// The agent uses a bounded channel (capacity 100) for events. Events are sent
80/// using non-blocking sends:
81///
82/// - If the channel has space, events are sent immediately
83/// - If the channel is full, the agent waits up to 30 seconds before timing out
84/// - If the receiver is dropped, the agent continues processing without blocking
85///
86/// This design ensures that slow consumers don't stall the LLM stream, but events
87/// may be dropped if the consumer is too slow or disconnects.
88///
89/// # Running the Agent
90///
91/// ```ignore
92/// let (mut events, final_state) = agent.run(
93/// thread_id,
94/// AgentInput::Text("Hello!".to_string()),
95/// tool_ctx,
96/// );
97/// while let Some(envelope) = events.recv().await {
98/// match envelope.event {
99/// AgentEvent::Text {
100/// message_id: _,
101/// text,
102/// } => println!("{}", text),
103/// AgentEvent::Done { .. } => break,
104/// _ => {}
105/// }
106/// }
107/// ```
108pub struct AgentLoop<Ctx, P, H, M, S>
109where
110 P: LlmProvider,
111 H: AgentHooks,
112 M: MessageStore,
113 S: StateStore,
114{
115 pub(super) provider: Arc<P>,
116 pub(super) tools: Arc<ToolRegistry<Ctx>>,
117 pub(super) hooks: Arc<H>,
118 pub(super) message_store: Arc<M>,
119 pub(super) state_store: Arc<S>,
120 pub(super) config: AgentConfig,
121 pub(super) compaction_config: Option<CompactionConfig>,
122 pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
123 pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
124}
125
126/// Create a new builder for constructing an `AgentLoop`.
127#[must_use]
128pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
129 AgentLoopBuilder::new()
130}
131
132impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
133where
134 Ctx: Send + Sync + 'static,
135 P: LlmProvider + 'static,
136 H: AgentHooks + 'static,
137 M: MessageStore + 'static,
138 S: StateStore + 'static,
139{
140 /// Create a new agent loop with all components specified directly.
141 #[must_use]
142 pub fn new(
143 provider: P,
144 tools: ToolRegistry<Ctx>,
145 hooks: H,
146 message_store: M,
147 state_store: S,
148 config: AgentConfig,
149 ) -> Self {
150 Self {
151 provider: Arc::new(provider),
152 tools: Arc::new(tools),
153 hooks: Arc::new(hooks),
154 message_store: Arc::new(message_store),
155 state_store: Arc::new(state_store),
156 config,
157 compaction_config: None,
158 compactor: None,
159 execution_store: None,
160 }
161 }
162
163 /// Create a new agent loop with compaction enabled.
164 #[must_use]
165 pub fn with_compaction(
166 provider: P,
167 tools: ToolRegistry<Ctx>,
168 hooks: H,
169 message_store: M,
170 state_store: S,
171 config: AgentConfig,
172 compaction_config: CompactionConfig,
173 ) -> Self {
174 Self {
175 provider: Arc::new(provider),
176 tools: Arc::new(tools),
177 hooks: Arc::new(hooks),
178 message_store: Arc::new(message_store),
179 state_store: Arc::new(state_store),
180 config,
181 compaction_config: Some(compaction_config),
182 compactor: None,
183 execution_store: None,
184 }
185 }
186
187 /// Run the agent loop.
188 ///
189 /// This method allows the agent to pause when a tool requires confirmation,
190 /// returning an `AgentRunState::AwaitingConfirmation` that contains the
191 /// state needed to resume.
192 ///
193 /// When the `cancel_token` is cancelled, the agent will stop after the
194 /// current turn completes (no new turns will start). The final state will
195 /// be `AgentRunState::Cancelled`.
196 ///
197 /// # Arguments
198 ///
199 /// * `thread_id` - The thread identifier for this conversation
200 /// * `input` - Either a new text message or a resume with confirmation decision
201 /// * `tool_context` - Context passed to tools
202 /// * `cancel_token` - Token to signal cancellation from outside
203 ///
204 /// # Returns
205 ///
206 /// A tuple of:
207 /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events
208 /// - `oneshot::Receiver<AgentRunState>` - Channel for the final state
209 ///
210 /// # Example
211 ///
212 /// ```ignore
213 /// let cancel = CancellationToken::new();
214 /// let (events, final_state) = agent.run(
215 /// thread_id,
216 /// AgentInput::Text("Hello".to_string()),
217 /// tool_ctx,
218 /// cancel.clone(),
219 /// );
220 ///
221 /// while let Some(envelope) = events.recv().await {
222 /// // Handle events...
223 /// }
224 ///
225 /// match final_state.await.unwrap() {
226 /// AgentRunState::Done { .. } => { /* completed */ }
227 /// AgentRunState::Cancelled { .. } => { /* user cancelled */ }
228 /// AgentRunState::AwaitingConfirmation { continuation, .. } => {
229 /// // Get user decision, then resume:
230 /// let (events2, state2) = agent.run(
231 /// thread_id,
232 /// AgentInput::Resume {
233 /// continuation,
234 /// tool_call_id: id,
235 /// confirmed: true,
236 /// rejection_reason: None,
237 /// },
238 /// tool_ctx,
239 /// cancel.clone(),
240 /// );
241 /// }
242 /// AgentRunState::Error(e) => { /* handle error */ }
243 /// }
244 /// ```
245 pub fn run(
246 &self,
247 thread_id: ThreadId,
248 input: AgentInput,
249 tool_context: ToolContext<Ctx>,
250 cancel_token: CancellationToken,
251 ) -> (
252 mpsc::Receiver<AgentEventEnvelope>,
253 oneshot::Receiver<AgentRunState>,
254 )
255 where
256 Ctx: Clone,
257 {
258 let (event_tx, event_rx) = mpsc::channel(100);
259 let (state_tx, state_rx) = oneshot::channel();
260 let seq = SequenceCounter::new();
261
262 let provider = Arc::clone(&self.provider);
263 let tools = Arc::clone(&self.tools);
264 let hooks = Arc::clone(&self.hooks);
265 let message_store = Arc::clone(&self.message_store);
266 let state_store = Arc::clone(&self.state_store);
267 let config = self.config.clone();
268 let compaction_config = self.compaction_config.clone();
269 let compactor = self.compactor.clone();
270 let execution_store = self.execution_store.clone();
271
272 tokio::spawn(async move {
273 let result = run_loop(RunLoopParameters {
274 tx: event_tx,
275 seq,
276 thread_id,
277 input,
278 tool_context,
279 provider,
280 tools,
281 hooks,
282 message_store,
283 state_store,
284 config,
285 compaction_config,
286 compactor,
287 execution_store,
288 cancel_token,
289 })
290 .await;
291
292 let _ = state_tx.send(result);
293 });
294
295 (event_rx, state_rx)
296 }
297
298 /// Run a single turn of the agent loop.
299 ///
300 /// Unlike `run()`, this method executes exactly one turn and returns control
301 /// to the caller. This enables external orchestration where each turn can be
302 /// dispatched as a separate message (e.g., via Artemis or another message queue).
303 ///
304 /// When the `cancel_token` is cancelled, the turn will be aborted before
305 /// starting execution and return `TurnOutcome::Cancelled`.
306 ///
307 /// # Arguments
308 ///
309 /// * `thread_id` - The thread identifier for this conversation
310 /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
311 /// * `tool_context` - Context passed to tools
312 /// * `cancel_token` - Token to signal cancellation from outside
313 ///
314 /// # Returns
315 ///
316 /// A tuple of:
317 /// - `mpsc::Receiver<AgentEvent>` - Channel for streaming events from this turn
318 /// - `TurnOutcome` - The turn's outcome
319 ///
320 /// # Turn Outcomes
321 ///
322 /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
323 /// - `Done` - Agent completed successfully
324 /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
325 /// - `Cancelled` - Turn was cancelled via the token
326 /// - `Error` - An error occurred
327 ///
328 /// # Example
329 ///
330 /// ```ignore
331 /// let cancel = CancellationToken::new();
332 /// // Start conversation
333 /// let (events, outcome) = agent.run_turn(
334 /// thread_id.clone(),
335 /// AgentInput::Text("What is 2+2?".to_string()),
336 /// tool_ctx.clone(),
337 /// cancel,
338 /// ).await;
339 ///
340 /// // Process events...
341 /// while let Some(_envelope) = events.recv().await { /* ... */ }
342 ///
343 /// // Check outcome
344 /// match outcome {
345 /// TurnOutcome::NeedsMoreTurns { turn, .. } => {
346 /// // Dispatch another message to continue
347 /// // (e.g., schedule an Artemis message)
348 /// }
349 /// TurnOutcome::Done { .. } => {
350 /// // Conversation complete
351 /// }
352 /// TurnOutcome::AwaitingConfirmation { continuation, .. } => {
353 /// // Get user confirmation, then resume
354 /// }
355 /// TurnOutcome::Error(e) => {
356 /// // Handle error
357 /// }
358 /// }
359 /// ```
360 pub fn run_turn(
361 &self,
362 thread_id: ThreadId,
363 input: AgentInput,
364 tool_context: ToolContext<Ctx>,
365 cancel_token: CancellationToken,
366 ) -> (
367 mpsc::Receiver<AgentEventEnvelope>,
368 oneshot::Receiver<TurnOutcome>,
369 )
370 where
371 Ctx: Clone,
372 {
373 let (event_tx, event_rx) = mpsc::channel(100);
374 let (outcome_tx, outcome_rx) = oneshot::channel();
375 let seq = SequenceCounter::new();
376
377 let provider = Arc::clone(&self.provider);
378 let tools = Arc::clone(&self.tools);
379 let hooks = Arc::clone(&self.hooks);
380 let message_store = Arc::clone(&self.message_store);
381 let state_store = Arc::clone(&self.state_store);
382 let config = self.config.clone();
383 let compaction_config = self.compaction_config.clone();
384 let compactor = self.compactor.clone();
385 let execution_store = self.execution_store.clone();
386
387 tokio::spawn(async move {
388 let result = run_single_turn(TurnParameters {
389 tx: event_tx,
390 seq,
391 thread_id,
392 input,
393 tool_context,
394 provider,
395 tools,
396 hooks,
397 message_store,
398 state_store,
399 config,
400 compaction_config,
401 compactor,
402 execution_store,
403 cancel_token,
404 })
405 .await;
406
407 let _ = outcome_tx.send(result);
408 });
409
410 (event_rx, outcome_rx)
411 }
412}