Skip to main content

AgentLoop

Struct AgentLoop 

Source
pub struct AgentLoop<Ctx, P, H, M, S>{ /* private fields */ }
Expand description

The main agent loop that orchestrates LLM calls and tool execution.

AgentLoop is the core component that:

  • Manages conversation state via message and state stores
  • Calls the LLM provider and processes responses
  • Executes tools through the tool registry
  • Persists events to the configured event store
  • Enforces hooks for tool permissions and lifecycle events

§Type Parameters

  • Ctx: Application-specific context passed to tools (e.g., user ID, database)
  • P: The LLM provider implementation
  • H: The hooks implementation for lifecycle customization
  • M: The message store implementation
  • S: The state store implementation

§Event Storage

Every loop instance requires an EventStore configured at construction time. Events are written to that store for the entire lifecycle of the loop, and callers read them back from the store instead of receiving an in-process channel from the runtime.

§Running the Agent

let final_state = agent.run(
    thread_id,
    AgentInput::Text("Hello!".to_string()),
    tool_ctx,
);
let state = final_state.await?;
let events = event_store.get_events(&thread_id).await?;

Implementations§

Source§

impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
where Ctx: Send + Sync + 'static, P: LlmProvider + 'static, H: AgentHooks + 'static, M: MessageStore + 'static, S: StateStore + 'static,

Source

pub fn new( provider: P, tools: ToolRegistry<Ctx>, hooks: H, message_store: M, state_store: S, event_store: Arc<dyn EventStore>, config: AgentConfig, ) -> Self

Create a new agent loop with all components specified directly.

Source

pub fn with_compaction( provider: P, tools: ToolRegistry<Ctx>, hooks: H, message_store: M, state_store: S, event_store: Arc<dyn EventStore>, config: AgentLoopCompactionConfig, ) -> Self

Create a new agent loop with compaction enabled.

Source

pub fn with_audit_sink(self, sink: impl ToolAuditSink + 'static) -> Self

Set the authoritative tool audit sink.

When set, the loop emits a ToolAuditRecord at every tool-lifecycle transition (blocked, requires-confirmation, cached, replayed, invalidated, completed, persistence-failed).

The default is NoopAuditSink which discards every record — suitable for local/CLI usage. Servers should swap in a durable sink.

Source

pub fn with_observability_store( self, store: impl ObservabilityStore + 'static, ) -> Self

Available on crate feature otel only.

Set the observability store for GenAI payload capture.

When set, the store is called at each LLM request boundary to decide whether payloads are inlined on spans, externalized, or omitted.

Source

pub fn run( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, ) -> impl Future<Output = Result<AgentRunState>> + Send + 'static
where Ctx: Clone,

Run the agent loop.

This method allows the agent to pause when a tool requires confirmation, returning an AgentRunState::AwaitingConfirmation that contains the state needed to resume.

When the cancel_token is cancelled, the agent will stop after the current turn completes (no new turns will start). The final state will be AgentRunState::Cancelled.

§Arguments
  • thread_id - The thread identifier for this conversation
  • input - Either a new text message or a resume with confirmation decision
  • tool_context - Context passed to tools
  • cancel_token - Token to signal cancellation from outside
§Returns

A future that resolves to the final AgentRunState. Awaiting it drives the run to completion:

let final_state = agent.run(thread_id, input, tool_ctx, cancel).await?;

The future is 'static — the run is already spawned on a Tokio task before this returns, so dropping the future does not stop the run (use cancel_token). Awaiting it only waits for the result.

§Example
let cancel = CancellationToken::new();
let final_state = agent.run(
    thread_id,
    AgentInput::Text("Hello".to_string()),
    tool_ctx,
    cancel.clone(),
).await?;

match final_state {
    AgentRunState::Done { .. } => { /* completed */ }
    AgentRunState::Cancelled { .. } => { /* user cancelled */ }
    AgentRunState::AwaitingConfirmation { continuation, .. } => {
        // Get user decision, then resume:
        let state2 = agent.run(
            thread_id,
            AgentInput::Resume {
                continuation,
                tool_call_id: id,
                confirmed: true,
                rejection_reason: None,
            },
            tool_ctx,
            cancel.clone(),
        ).await?;
    }
    AgentRunState::Error(e) => { /* handle error */ }
}
§Cancellation, timeout, and the dropped JoinHandle

run spawns the agent loop on a Tokio task and returns a future over the state channel — it drops the task’s tokio::task::JoinHandle. Dropping a JoinHandle detaches the task rather than aborting it, so the only ways to stop an in-flight run are the cancel_token (cooperative) or the per-tool AgentConfig::tool_timeout_ms boundary. Callers that need to forcibly abort must use run_abortable and keep the handle.

Because the handle is dropped, a tool that holds a subprocess open must obey the cooperative-cancel contract (kill_on_drop or a token-aware kill) or the subprocess will outlive the cancelled / timed-out run. A debug! is logged here so the detach is visible when chasing a leaked subprocess.

§Errors

Returns an error only if the spawned run task is dropped before it can report a final state (e.g. a runtime shutdown). A panic inside the run is caught and surfaced as AgentRunState::Error, not as an Err.

Source

pub fn run_with_options( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, run_options: RunOptions, ) -> impl Future<Output = Result<AgentRunState>> + Send + 'static
where Ctx: Clone,

Like run, but with caller-supplied trace metadata.

Equivalent to run except that the supplied RunOptions configure session/user IDs (propagated as session.id / user.id baggage), langfuse.trace.{name,tags,metadata.*, input,output}, langfuse.{release,environment}, and the trace-text truncation ceiling.

Use this instead of run whenever the consumer needs the SDK to populate Langfuse trace metadata; run itself continues to delegate here with RunOptions::default().

§Errors

See run.

Source

pub fn run_abortable( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, ) -> (Receiver<AgentRunState>, JoinHandle<()>)
where Ctx: Clone,

Like run, but also returns the tokio::task::JoinHandle for the spawned task.

Callers that need to forcibly abort the agent loop (e.g. subagent timeout) can call tokio::task::JoinHandle::abort on the returned handle. Aborting the handle drops the in-flight LLM stream immediately instead of waiting for the current turn to finish.

Source

pub fn run_abortable_with_options( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, run_options: RunOptions, ) -> (Receiver<AgentRunState>, JoinHandle<()>)
where Ctx: Clone,

Like run_abortable, but with caller-supplied trace metadata. See run_with_options.

Source

pub fn run_persistent( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, ) -> AgentHandle
where Ctx: Clone,

Run the agent with a persistent input channel.

Unlike Self::run, this returns an AgentHandle that allows the caller to inject new user messages into the running agent via input_tx. The agent will process the initial input, then wait for new messages on the channel between turns instead of exiting on Done.

The agent exits when:

  • The input_tx sender is dropped (no more messages)
  • The cancel_token is cancelled
  • Max turns exceeded
Source

pub fn run_persistent_with_options( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, run_options: RunOptions, ) -> AgentHandle
where Ctx: Clone,

Like run_persistent, but with caller-supplied trace metadata. See run_with_options.

Source

pub async fn run_turn( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, options: TurnOptions, ) -> TurnOutcome
where Ctx: Clone,

Run a single turn of the agent loop — the authoritative server boundary.

Unlike run(), this method executes exactly one turn directly in the caller’s task (no tokio::spawn) and returns the result inline. This enables external orchestration where each turn can be dispatched as a separate message (e.g., via Artemis or another message queue).

When the cancel_token is cancelled, the turn will be aborted before starting execution and return TurnOutcome::Cancelled.

§Arguments
  • thread_id - The thread identifier for this conversation
  • input - Text to start, Resume after confirmation, or Continue after a turn
  • tool_context - Context passed to tools
  • cancel_token - Token to signal cancellation from outside
  • options - Execution options (tool runtime strategy, durability)
§Returns

A crate::types::TurnOutcome returned only after the configured event store’s finish_turn(thread_id, turn) barrier has completed.

Every variant except crate::types::TurnOutcome::Error carries a structured crate::types::TurnSummary in the summary field. This summary is the authoritative server-facing outcome contract — it contains the provider/model provenance, response ID, stop reason, tool-call count, duration, and execution options for the turn. Server code should read from summary rather than the legacy per-variant fields (total_turns, input_tokens, output_tokens, …), which are retained only for backwards compatibility with local callers.

§Turn Outcomes
  • NeedsMoreTurns - Turn completed, call again with AgentInput::Continue
  • Done - Agent completed successfully
  • AwaitingConfirmation - Tool needs confirmation, call again with AgentInput::Resume
  • PendingToolCalls - Tools need external execution (only with ToolRuntime::External)
  • Cancelled - Turn was cancelled via the token
  • Error - An error occurred (no summary attached)
§Example
use std::sync::Arc;
use agent_sdk::{InMemoryEventStore, TurnOptions};

let cancel = CancellationToken::new();
let event_store = Arc::new(InMemoryEventStore::new());
let outcome = agent.run_turn(
    thread_id.clone(),
    AgentInput::Text("What is 2+2?".to_string()),
    tool_ctx.clone(),
    cancel,
    TurnOptions::default(),
).await;

let events = event_store.get_events(&thread_id).await?;

// Read server-facing metadata from the TurnSummary.
if let Some(summary) = outcome.summary() {
    println!(
        "turn={} provider={} model={} stop={:?} response_id={:?}",
        summary.turn,
        summary.provenance.provider,
        summary.provenance.model,
        summary.stop_reason,
        summary.response_id,
    );
}

// Branch on the variant for flow control.
match outcome {
    TurnOutcome::NeedsMoreTurns { turn, .. } => {
        // Dispatch another message to continue
    }
    TurnOutcome::Done { .. } => {
        // Conversation complete
    }
    TurnOutcome::PendingToolCalls { tool_calls, .. } => {
        // Execute tools externally, then call run_turn with Continue
    }
    _ => {}
}
Source

pub async fn run_turn_with_options( &self, thread_id: ThreadId, input: AgentInput, tool_context: ToolContext<Ctx>, cancel_token: CancellationToken, turn_options: TurnOptions, run_options: RunOptions, ) -> TurnOutcome
where Ctx: Clone,

Like run_turn, but with caller-supplied trace metadata.

See run_with_options for the full RunOptions contract. The turn_options parameter retains its existing semantics (tool runtime / strict durability); run_options is layered on top to populate Langfuse trace metadata on the root invoke_agent span.

Source§

impl<P, H, M, S> AgentLoop<(), P, H, M, S>
where P: LlmProvider + 'static, H: AgentHooks + 'static, M: MessageStore + 'static, S: StateStore + 'static,

High-level convenience API for agents whose tools take no application context (Ctx = ()).

ask and send collapse the four pieces of ceremony in the low-level run path — constructing a ToolContext::new(()), creating a CancellationToken, awaiting the run, and reassembling the assistant text out of the event store — into a single call that returns a String.

Reach for run or run_turn when you need application context, cancellation, confirmation flow, or access to the raw AgentRunState.

Source

pub async fn ask( &self, thread_id: ThreadId, text: impl Into<String>, ) -> Result<String>

Ask the agent a question and return its assembled reply.

This is the 30-second on-ramp: it builds a fresh ToolContext::new(()) and a CancellationToken internally, runs the agent to completion, and returns the assistant text emitted during this call concatenated into one String.

For confirmation flows, application context, or explicit cancellation, use run directly.

§Errors

Returns an error if the run task is dropped before reporting a state, if the run ends in AgentRunState::Error, or if the event store cannot be read back.

Source

pub async fn send( &self, thread_id: ThreadId, input: AgentInput, ) -> Result<String>

Send an AgentInput to the agent and return its assembled reply.

Like ask but accepts a full AgentInput (e.g. to resume after confirmation). Builds the ToolContext and CancellationToken internally and returns the assistant text emitted during this call.

§Errors

Returns an error if the run task is dropped before reporting a state, if the run ends in AgentRunState::Error, or if the event store cannot be read back.

Auto Trait Implementations§

§

impl<Ctx, P, H, M, S> !RefUnwindSafe for AgentLoop<Ctx, P, H, M, S>

§

impl<Ctx, P, H, M, S> !UnwindSafe for AgentLoop<Ctx, P, H, M, S>

§

impl<Ctx, P, H, M, S> Freeze for AgentLoop<Ctx, P, H, M, S>

§

impl<Ctx, P, H, M, S> Send for AgentLoop<Ctx, P, H, M, S>

§

impl<Ctx, P, H, M, S> Sync for AgentLoop<Ctx, P, H, M, S>

§

impl<Ctx, P, H, M, S> Unpin for AgentLoop<Ctx, P, H, M, S>

§

impl<Ctx, P, H, M, S> UnsafeUnpin for AgentLoop<Ctx, P, H, M, S>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more