pub struct AgentLoop<Ctx, P, H, M, S>{ /* private fields */ }Expand description
The main agent loop that orchestrates LLM calls and tool execution.
AgentLoop is the core component that:
- Manages conversation state via message and state stores
- Calls the LLM provider and processes responses
- Executes tools through the tool registry
- Persists events to the configured event store
- Enforces hooks for tool permissions and lifecycle events
§Type Parameters
Ctx: Application-specific context passed to tools (e.g., user ID, database)P: The LLM provider implementationH: The hooks implementation for lifecycle customizationM: The message store implementationS: The state store implementation
§Event Storage
Every loop instance requires an EventStore configured at construction
time. Events are written to that store for the entire lifecycle of the loop,
and callers read them back from the store instead of receiving an in-process
channel from the runtime.
§Running the Agent
let final_state = agent.run(
thread_id,
AgentInput::Text("Hello!".to_string()),
tool_ctx,
);
let state = final_state.await?;
let events = event_store.get_events(&thread_id).await?;Implementations§
Source§impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>where
Ctx: Send + Sync + 'static,
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>where
Ctx: Send + Sync + 'static,
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
Sourcepub fn new(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
event_store: Arc<dyn EventStore>,
config: AgentConfig,
) -> Self
pub fn new( provider: P, tools: ToolRegistry<Ctx>, hooks: H, message_store: M, state_store: S, event_store: Arc<dyn EventStore>, config: AgentConfig, ) -> Self
Create a new agent loop with all components specified directly.
Sourcepub fn with_compaction(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
event_store: Arc<dyn EventStore>,
config: AgentLoopCompactionConfig,
) -> Self
pub fn with_compaction( provider: P, tools: ToolRegistry<Ctx>, hooks: H, message_store: M, state_store: S, event_store: Arc<dyn EventStore>, config: AgentLoopCompactionConfig, ) -> Self
Create a new agent loop with compaction enabled.
Sourcepub fn with_audit_sink(self, sink: impl ToolAuditSink + 'static) -> Self
pub fn with_audit_sink(self, sink: impl ToolAuditSink + 'static) -> Self
Set the authoritative tool audit sink.
When set, the loop emits a ToolAuditRecord
at every tool-lifecycle transition (blocked, requires-confirmation,
cached, replayed, invalidated, completed, persistence-failed).
The default is NoopAuditSink which
discards every record — suitable for local/CLI usage. Servers should
swap in a durable sink.
Sourcepub fn with_observability_store(
self,
store: impl ObservabilityStore + 'static,
) -> Self
Available on crate feature otel only.
pub fn with_observability_store( self, store: impl ObservabilityStore + 'static, ) -> Self
otel only.Set the observability store for GenAI payload capture.
When set, the store is called at each LLM request boundary to decide whether payloads are inlined on spans, externalized, or omitted.
Sourcepub fn run(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> impl Future<Output = Result<AgentRunState>> + Send + 'staticwhere
Ctx: Clone,
pub fn run(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> impl Future<Output = Result<AgentRunState>> + Send + 'staticwhere
Ctx: Clone,
Run the agent loop.
This method allows the agent to pause when a tool requires confirmation,
returning an AgentRunState::AwaitingConfirmation that contains the
state needed to resume.
When the cancel_token is cancelled, the agent will stop after the
current turn completes (no new turns will start). The final state will
be AgentRunState::Cancelled.
§Arguments
thread_id- The thread identifier for this conversationinput- Either a new text message or a resume with confirmation decisiontool_context- Context passed to toolscancel_token- Token to signal cancellation from outside
§Returns
A future that resolves to the final AgentRunState. Awaiting it
drives the run to completion:
let final_state = agent.run(thread_id, input, tool_ctx, cancel).await?;The future is 'static — the run is already spawned on a Tokio task
before this returns, so dropping the future does not stop the run
(use cancel_token). Awaiting it only waits for the result.
§Example
let cancel = CancellationToken::new();
let final_state = agent.run(
thread_id,
AgentInput::Text("Hello".to_string()),
tool_ctx,
cancel.clone(),
).await?;
match final_state {
AgentRunState::Done { .. } => { /* completed */ }
AgentRunState::Cancelled { .. } => { /* user cancelled */ }
AgentRunState::AwaitingConfirmation { continuation, .. } => {
// Get user decision, then resume:
let state2 = agent.run(
thread_id,
AgentInput::Resume {
continuation,
tool_call_id: id,
confirmed: true,
rejection_reason: None,
},
tool_ctx,
cancel.clone(),
).await?;
}
AgentRunState::Error(e) => { /* handle error */ }
}§Cancellation, timeout, and the dropped JoinHandle
run spawns the agent loop on a Tokio task and returns a future over
the state channel — it drops the task’s
tokio::task::JoinHandle. Dropping a JoinHandle detaches the
task rather than aborting it, so the only ways to stop an in-flight
run are the cancel_token (cooperative) or the per-tool
AgentConfig::tool_timeout_ms
boundary. Callers that need to forcibly abort must use
run_abortable and keep the handle.
Because the handle is dropped, a tool that holds a subprocess open
must obey the
cooperative-cancel contract
(kill_on_drop or a token-aware kill) or the subprocess will
outlive the cancelled / timed-out run. A debug! is logged here so
the detach is visible when chasing a leaked subprocess.
§Errors
Returns an error only if the spawned run task is dropped before it can
report a final state (e.g. a runtime shutdown). A panic inside the run
is caught and surfaced as AgentRunState::Error, not as an Err.
Sourcepub fn run_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> impl Future<Output = Result<AgentRunState>> + Send + 'staticwhere
Ctx: Clone,
pub fn run_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> impl Future<Output = Result<AgentRunState>> + Send + 'staticwhere
Ctx: Clone,
Like run, but with caller-supplied trace metadata.
Equivalent to run except that the supplied RunOptions
configure session/user IDs (propagated as session.id /
user.id baggage), langfuse.trace.{name,tags,metadata.*, input,output}, langfuse.{release,environment}, and the
trace-text truncation ceiling.
Use this instead of run whenever the consumer needs the
SDK to populate Langfuse trace metadata; run itself
continues to delegate here with RunOptions::default().
§Errors
See run.
Sourcepub fn run_abortable(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> (Receiver<AgentRunState>, JoinHandle<()>)where
Ctx: Clone,
pub fn run_abortable(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> (Receiver<AgentRunState>, JoinHandle<()>)where
Ctx: Clone,
Like run, but also returns the tokio::task::JoinHandle for the
spawned task.
Callers that need to forcibly abort the agent loop (e.g. subagent
timeout) can call tokio::task::JoinHandle::abort on the returned handle.
Aborting the handle drops the in-flight LLM stream immediately
instead of waiting for the current turn to finish.
Sourcepub fn run_abortable_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> (Receiver<AgentRunState>, JoinHandle<()>)where
Ctx: Clone,
pub fn run_abortable_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> (Receiver<AgentRunState>, JoinHandle<()>)where
Ctx: Clone,
Like run_abortable, but with
caller-supplied trace metadata. See run_with_options.
Sourcepub fn run_persistent(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> AgentHandlewhere
Ctx: Clone,
pub fn run_persistent(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> AgentHandlewhere
Ctx: Clone,
Run the agent with a persistent input channel.
Unlike Self::run, this returns an AgentHandle that allows the caller
to inject new user messages into the running agent via input_tx.
The agent will process the initial input, then wait for new messages
on the channel between turns instead of exiting on Done.
The agent exits when:
- The
input_txsender is dropped (no more messages) - The
cancel_tokenis cancelled - Max turns exceeded
Sourcepub fn run_persistent_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> AgentHandlewhere
Ctx: Clone,
pub fn run_persistent_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> AgentHandlewhere
Ctx: Clone,
Like run_persistent, but with
caller-supplied trace metadata. See
run_with_options.
Sourcepub async fn run_turn(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
options: TurnOptions,
) -> TurnOutcomewhere
Ctx: Clone,
pub async fn run_turn(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
options: TurnOptions,
) -> TurnOutcomewhere
Ctx: Clone,
Run a single turn of the agent loop — the authoritative server boundary.
Unlike run(), this method executes exactly one turn directly in the
caller’s task (no tokio::spawn) and returns the result inline. This
enables external orchestration where each turn can be dispatched as a
separate message (e.g., via Artemis or another message queue).
When the cancel_token is cancelled, the turn will be aborted before
starting execution and return TurnOutcome::Cancelled.
§Arguments
thread_id- The thread identifier for this conversationinput- Text to start, Resume after confirmation, or Continue after a turntool_context- Context passed to toolscancel_token- Token to signal cancellation from outsideoptions- Execution options (tool runtime strategy, durability)
§Returns
A crate::types::TurnOutcome returned only after the configured event store’s
finish_turn(thread_id, turn) barrier has completed.
Every variant except crate::types::TurnOutcome::Error carries a
structured crate::types::TurnSummary in the summary field. This
summary is the authoritative server-facing outcome contract —
it contains the provider/model provenance, response ID, stop reason,
tool-call count, duration, and execution options for the turn. Server
code should read from summary rather than the legacy per-variant
fields (total_turns, input_tokens, output_tokens, …), which are
retained only for backwards compatibility with local callers.
§Turn Outcomes
NeedsMoreTurns- Turn completed, call again withAgentInput::ContinueDone- Agent completed successfullyAwaitingConfirmation- Tool needs confirmation, call again withAgentInput::ResumePendingToolCalls- Tools need external execution (only withToolRuntime::External)Cancelled- Turn was cancelled via the tokenError- An error occurred (no summary attached)
§Example
use std::sync::Arc;
use agent_sdk::{InMemoryEventStore, TurnOptions};
let cancel = CancellationToken::new();
let event_store = Arc::new(InMemoryEventStore::new());
let outcome = agent.run_turn(
thread_id.clone(),
AgentInput::Text("What is 2+2?".to_string()),
tool_ctx.clone(),
cancel,
TurnOptions::default(),
).await;
let events = event_store.get_events(&thread_id).await?;
// Read server-facing metadata from the TurnSummary.
if let Some(summary) = outcome.summary() {
println!(
"turn={} provider={} model={} stop={:?} response_id={:?}",
summary.turn,
summary.provenance.provider,
summary.provenance.model,
summary.stop_reason,
summary.response_id,
);
}
// Branch on the variant for flow control.
match outcome {
TurnOutcome::NeedsMoreTurns { turn, .. } => {
// Dispatch another message to continue
}
TurnOutcome::Done { .. } => {
// Conversation complete
}
TurnOutcome::PendingToolCalls { tool_calls, .. } => {
// Execute tools externally, then call run_turn with Continue
}
_ => {}
}Sourcepub async fn run_turn_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
turn_options: TurnOptions,
run_options: RunOptions,
) -> TurnOutcomewhere
Ctx: Clone,
pub async fn run_turn_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
turn_options: TurnOptions,
run_options: RunOptions,
) -> TurnOutcomewhere
Ctx: Clone,
Like run_turn, but with caller-supplied
trace metadata.
See run_with_options for the full
RunOptions contract. The turn_options parameter retains
its existing semantics (tool runtime / strict durability);
run_options is layered on top to populate Langfuse trace
metadata on the root invoke_agent span.
Source§impl<P, H, M, S> AgentLoop<(), P, H, M, S>where
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
High-level convenience API for agents whose tools take no application
context (Ctx = ()).
impl<P, H, M, S> AgentLoop<(), P, H, M, S>where
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
High-level convenience API for agents whose tools take no application
context (Ctx = ()).
ask and send collapse the four pieces of
ceremony in the low-level run path — constructing a
ToolContext::new(()), creating a
CancellationToken, awaiting the run, and reassembling the assistant
text out of the event store — into a single call that returns a String.
Reach for run or run_turn when you need
application context, cancellation, confirmation flow, or access to the raw
AgentRunState.
Sourcepub async fn ask(
&self,
thread_id: ThreadId,
text: impl Into<String>,
) -> Result<String>
pub async fn ask( &self, thread_id: ThreadId, text: impl Into<String>, ) -> Result<String>
Ask the agent a question and return its assembled reply.
This is the 30-second on-ramp: it builds a fresh
ToolContext::new(()) and a
CancellationToken internally, runs the agent to completion, and
returns the assistant text emitted during this call concatenated into
one String.
For confirmation flows, application context, or explicit cancellation,
use run directly.
§Errors
Returns an error if the run task is dropped before reporting a state,
if the run ends in AgentRunState::Error, or if the event store
cannot be read back.
Sourcepub async fn send(
&self,
thread_id: ThreadId,
input: AgentInput,
) -> Result<String>
pub async fn send( &self, thread_id: ThreadId, input: AgentInput, ) -> Result<String>
Send an AgentInput to the agent and return its assembled reply.
Like ask but accepts a full AgentInput (e.g. to
resume after confirmation). Builds the
ToolContext and CancellationToken
internally and returns the assistant text emitted during this call.
§Errors
Returns an error if the run task is dropped before reporting a state,
if the run ends in AgentRunState::Error, or if the event store
cannot be read back.