pub struct AgentLoop<Ctx, P, H, M, S>{ /* private fields */ }Expand description
The main agent loop that orchestrates LLM calls and tool execution.
AgentLoop is the core component that:
- Manages conversation state via message and state stores
- Calls the LLM provider and processes responses
- Executes tools through the tool registry
- Emits events for real-time updates via an async channel
- Enforces hooks for tool permissions and lifecycle events
§Type Parameters
Ctx: Application-specific context passed to tools (e.g., user ID, database)P: The LLM provider implementationH: The hooks implementation for lifecycle customizationM: The message store implementationS: The state store implementation
§Event Channel Behavior
The agent uses a bounded channel (capacity 100) for events. Events are sent using non-blocking sends:
- If the channel has space, events are sent immediately
- If the channel is full, the agent waits up to 30 seconds before timing out
- If the receiver is dropped, the agent continues processing without blocking
This design ensures that slow consumers don’t stall the LLM stream, but events may be dropped if the consumer is too slow or disconnects.
§Running the Agent
let (mut events, final_state) = agent.run(
thread_id,
AgentInput::Text("Hello!".to_string()),
tool_ctx,
);
while let Some(event) = events.recv().await {
match event {
AgentEvent::Text { text } => println!("{}", text),
AgentEvent::Done { .. } => break,
_ => {}
}
}Implementations§
Source§impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>where
Ctx: Send + Sync + 'static,
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>where
Ctx: Send + Sync + 'static,
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
Sourcepub fn new(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
config: AgentConfig,
) -> Self
pub fn new( provider: P, tools: ToolRegistry<Ctx>, hooks: H, message_store: M, state_store: S, config: AgentConfig, ) -> Self
Create a new agent loop with all components specified directly.
Sourcepub fn with_compaction(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
config: AgentConfig,
compaction_config: CompactionConfig,
) -> Self
pub fn with_compaction( provider: P, tools: ToolRegistry<Ctx>, hooks: H, message_store: M, state_store: S, config: AgentConfig, compaction_config: CompactionConfig, ) -> Self
Create a new agent loop with compaction enabled.
Sourcepub fn run(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
) -> (Receiver<AgentEvent>, Receiver<AgentRunState>)where
Ctx: Clone,
pub fn run(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
) -> (Receiver<AgentEvent>, Receiver<AgentRunState>)where
Ctx: Clone,
Run the agent loop.
This method allows the agent to pause when a tool requires confirmation,
returning an AgentRunState::AwaitingConfirmation that contains the
state needed to resume.
§Arguments
thread_id- The thread identifier for this conversationinput- Either a new text message or a resume with confirmation decisiontool_context- Context passed to tools
§Returns
A tuple of:
mpsc::Receiver<AgentEvent>- Channel for streaming eventsoneshot::Receiver<AgentRunState>- Channel for the final state
§Example
let (events, final_state) = agent.run(
thread_id,
AgentInput::Text("Hello".to_string()),
tool_ctx,
);
while let Some(event) = events.recv().await {
// Handle events...
}
match final_state.await.unwrap() {
AgentRunState::Done { .. } => { /* completed */ }
AgentRunState::AwaitingConfirmation { continuation, .. } => {
// Get user decision, then resume:
let (events2, state2) = agent.run(
thread_id,
AgentInput::Resume {
continuation,
tool_call_id: id,
confirmed: true,
rejection_reason: None,
},
tool_ctx,
);
}
AgentRunState::Error(e) => { /* handle error */ }
}Sourcepub fn run_turn(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
) -> (Receiver<AgentEvent>, Receiver<TurnOutcome>)where
Ctx: Clone,
pub fn run_turn(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
) -> (Receiver<AgentEvent>, Receiver<TurnOutcome>)where
Ctx: Clone,
Run a single turn of the agent loop.
Unlike run(), this method executes exactly one turn and returns control
to the caller. This enables external orchestration where each turn can be
dispatched as a separate message (e.g., via Artemis or another message queue).
§Arguments
thread_id- The thread identifier for this conversationinput- Text to start, Resume after confirmation, or Continue after a turntool_context- Context passed to tools
§Returns
A tuple of:
mpsc::Receiver<AgentEvent>- Channel for streaming events from this turnTurnOutcome- The turn’s outcome
§Turn Outcomes
NeedsMoreTurns- Turn completed, call again withAgentInput::ContinueDone- Agent completed successfullyAwaitingConfirmation- Tool needs confirmation, call again withAgentInput::ResumeError- An error occurred
§Example
// Start conversation
let (events, outcome) = agent.run_turn(
thread_id.clone(),
AgentInput::Text("What is 2+2?".to_string()),
tool_ctx.clone(),
).await;
// Process events...
while let Some(event) = events.recv().await { /* ... */ }
// Check outcome
match outcome {
TurnOutcome::NeedsMoreTurns { turn, .. } => {
// Dispatch another message to continue
// (e.g., schedule an Artemis message)
}
TurnOutcome::Done { .. } => {
// Conversation complete
}
TurnOutcome::AwaitingConfirmation { continuation, .. } => {
// Get user confirmation, then resume
}
TurnOutcome::Error(e) => {
// Handle error
}
}