Skip to main content

agent_sdk/
agent_loop.rs

1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Persists events throughout to the configured event store
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//!     .provider(AnthropicProvider::sonnet(api_key))
26//!     .tools(my_tools)
27//!     .event_store(event_store)
28//!     .build();
29//! ```
30
31mod builder;
32mod helpers;
33mod idempotency;
34mod listen;
35mod llm;
36mod run_loop;
37#[cfg(test)]
38mod test_utils;
39#[cfg(test)]
40mod tests;
41mod tool_execution;
42mod turn;
43mod types;
44
45use self::run_loop::{run_loop, run_single_turn};
46use self::types::{RunLoopParameters, TurnParameters};
47use crate::types::TurnOptions;
48
49pub use self::builder::AgentLoopBuilder;
50
51use crate::authority::{EventAuthority, LocalEventAuthority};
52use crate::context::{CompactionConfig, ContextCompactor};
53use crate::hooks::AgentHooks;
54use crate::llm::LlmProvider;
55use crate::stores::{EventStore, MessageStore, StateStore, ToolExecutionStore};
56use crate::tools::{ToolContext, ToolRegistry};
57use crate::types::{AgentConfig, AgentError, AgentInput, AgentRunState, RunOptions, ThreadId};
58use futures::FutureExt;
59use std::future::Future;
60use std::panic::AssertUnwindSafe;
61use std::sync::Arc;
62use tokio::sync::{mpsc, oneshot};
63use tokio_util::sync::CancellationToken;
64
65/// Run the agent loop with panic isolation at the spawned-task boundary.
66///
67/// A panic anywhere inside the run loop — most importantly inside the
68/// LLM provider call or the compaction provider call, neither of which
69/// is otherwise guarded — would unwind the spawned task, drop the
70/// `state_tx` oneshot, and surface to the caller as an opaque
71/// [`oneshot::error::RecvError`] (and, for subagents, be misclassified
72/// as `Disconnected`). Catching the unwind here turns it into a
73/// structured [`AgentRunState::Error`] that the task can still send on
74/// `state_tx`, so the run ends observably rather than silently tearing
75/// down the channel.
76///
77/// `AssertUnwindSafe` is sound at this boundary: the run loop owns its
78/// `RunLoopParameters` by value, so nothing it touches outlives the
79/// caught panic and is observed afterwards. The only value produced is
80/// the returned `AgentRunState`. Tool-level panics are already caught
81/// closer to the tool boundary (see
82/// `agent_loop::helpers::catch_tool_panic`) so the assistant
83/// `tool_use` / `tool_result` history stays balanced; this guard is the
84/// outer safety net for everything else.
85async fn run_loop_isolated<Ctx, P, H, M, S>(
86    params: RunLoopParameters<Ctx, P, H, M, S>,
87) -> AgentRunState
88where
89    Ctx: Send + Sync + Clone + 'static,
90    P: LlmProvider,
91    H: AgentHooks,
92    M: MessageStore,
93    S: StateStore,
94{
95    match AssertUnwindSafe(run_loop(params)).catch_unwind().await {
96        Ok(state) => state,
97        Err(payload) => {
98            let message = self::helpers::panic_payload_message(payload.as_ref());
99            log::error!("agent run loop panicked: {message}");
100            AgentRunState::Error(AgentError::new(
101                format!("Agent run panicked: {message}"),
102                false,
103            ))
104        }
105    }
106}
107
108/// Drop a spawned run task's [`tokio::task::JoinHandle`], logging a
109/// `debug!` to make the detach visible.
110///
111/// `run` / `run_with_options` intentionally drop the handle: the run is
112/// stopped through the cancel token or the per-tool timeout, not by
113/// aborting the task. Surfacing the detach at `debug` level gives a
114/// breadcrumb when a subprocess-backed tool that ignores the
115/// cooperative-cancel contract leaks a process after cancellation.
116fn warn_on_detached_run_handle(handle: tokio::task::JoinHandle<()>) {
117    log::debug!(
118        "agent run JoinHandle dropped (task detached); the run can only be \
119         stopped via its cancel token or per-tool timeout. Subprocess-backed \
120         tools must honour kill_on_drop or a token-aware kill to avoid leaks"
121    );
122    drop(handle);
123}
124
125/// Await a run's state receiver, mapping a dropped channel to an `anyhow`
126/// error so `run`/`run_with_options` can present an `impl Future` instead of
127/// a bare [`oneshot::Receiver`].
128///
129/// A panic inside the run is already converted to
130/// [`AgentRunState::Error`] before the channel send (see
131/// [`run_loop_isolated`]), so the only way `recv` fails is the sender being
132/// dropped without sending — a runtime shutdown rather than an agent error.
133async fn recv_run_state(
134    state_rx: oneshot::Receiver<AgentRunState>,
135) -> anyhow::Result<AgentRunState> {
136    state_rx
137        .await
138        .map_err(|_| anyhow::anyhow!("agent run task was dropped before reporting a final state"))
139}
140
141/// Handle to a persistent agent thread.
142///
143/// Returned by [`AgentLoop::run_persistent`]. Allows the caller to send
144/// new messages to the running agent and cancel execution.
145pub struct AgentHandle {
146    /// Send new messages to the running agent. The agent processes them as new
147    /// user turns once it parks between turns.
148    ///
149    /// Only [`AgentInput::Text`] and [`AgentInput::Message`] are supported on
150    /// this channel — they are the only variants that represent a fresh user
151    /// turn. Injecting [`AgentInput::Resume`], [`AgentInput::SubmitToolResults`],
152    /// or [`AgentInput::Continue`] ends the run with [`AgentRunState::Error`]
153    /// (those belong to the single-turn `run_turn` flow, not the persistent
154    /// loop). Dropping the sender ends the run cleanly with `Done`.
155    pub input_tx: mpsc::Sender<AgentInput>,
156    /// Final run state (sent once when the agent completes).
157    pub state_rx: oneshot::Receiver<AgentRunState>,
158    /// Cancel the running agent.
159    pub cancel_token: CancellationToken,
160}
161
162/// Configuration bundle for constructing an [`AgentLoop`] with compaction.
163pub struct AgentLoopCompactionConfig {
164    pub agent_config: AgentConfig,
165    pub compaction_config: CompactionConfig,
166}
167
168impl AgentLoopCompactionConfig {
169    #[must_use]
170    pub const fn new(agent_config: AgentConfig, compaction_config: CompactionConfig) -> Self {
171        Self {
172            agent_config,
173            compaction_config,
174        }
175    }
176}
177
178/// The main agent loop that orchestrates LLM calls and tool execution.
179///
180/// `AgentLoop` is the core component that:
181/// - Manages conversation state via message and state stores
182/// - Calls the LLM provider and processes responses
183/// - Executes tools through the tool registry
184/// - Persists events to the configured event store
185/// - Enforces hooks for tool permissions and lifecycle events
186///
187/// # Type Parameters
188///
189/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
190/// - `P`: The LLM provider implementation
191/// - `H`: The hooks implementation for lifecycle customization
192/// - `M`: The message store implementation
193/// - `S`: The state store implementation
194///
195/// # Event Storage
196///
197/// Every loop instance requires an [`EventStore`] configured at construction
198/// time. Events are written to that store for the entire lifecycle of the loop,
199/// and callers read them back from the store instead of receiving an in-process
200/// channel from the runtime.
201///
202/// # Running the Agent
203///
204/// ```ignore
205/// let final_state = agent.run(
206///     thread_id,
207///     AgentInput::Text("Hello!".to_string()),
208///     tool_ctx,
209/// );
210/// let state = final_state.await?;
211/// let events = event_store.get_events(&thread_id).await?;
212/// ```
213pub struct AgentLoop<Ctx, P, H, M, S>
214where
215    P: LlmProvider,
216    H: AgentHooks,
217    M: MessageStore,
218    S: StateStore,
219{
220    pub(super) provider: Arc<P>,
221    pub(super) tools: Arc<ToolRegistry<Ctx>>,
222    pub(super) hooks: Arc<H>,
223    pub(super) message_store: Arc<M>,
224    pub(super) state_store: Arc<S>,
225    pub(super) event_store: Arc<dyn EventStore>,
226    pub(super) event_authority: Option<Arc<dyn EventAuthority>>,
227    pub(super) config: AgentConfig,
228    pub(super) compaction_config: Option<CompactionConfig>,
229    pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
230    pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
231    pub(super) audit_sink: Arc<dyn crate::hooks::ToolAuditSink>,
232    #[cfg(feature = "otel")]
233    pub(super) observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
234}
235
236/// Create a new builder for constructing an `AgentLoop`.
237#[must_use]
238pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
239    AgentLoopBuilder::new()
240}
241
242impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
243where
244    Ctx: Send + Sync + 'static,
245    P: LlmProvider + 'static,
246    H: AgentHooks + 'static,
247    M: MessageStore + 'static,
248    S: StateStore + 'static,
249{
250    /// Create a new agent loop with all components specified directly.
251    #[must_use]
252    pub fn new(
253        provider: P,
254        tools: ToolRegistry<Ctx>,
255        hooks: H,
256        message_store: M,
257        state_store: S,
258        event_store: Arc<dyn EventStore>,
259        config: AgentConfig,
260    ) -> Self {
261        Self {
262            provider: Arc::new(provider),
263            tools: Arc::new(tools),
264            hooks: Arc::new(hooks),
265            message_store: Arc::new(message_store),
266            state_store: Arc::new(state_store),
267            event_store,
268            event_authority: None,
269            config,
270            compaction_config: None,
271            compactor: None,
272            execution_store: None,
273            audit_sink: Arc::new(crate::hooks::NoopAuditSink),
274            #[cfg(feature = "otel")]
275            observability_store: None,
276        }
277    }
278
279    /// Create a new agent loop with compaction enabled.
280    #[must_use]
281    pub fn with_compaction(
282        provider: P,
283        tools: ToolRegistry<Ctx>,
284        hooks: H,
285        message_store: M,
286        state_store: S,
287        event_store: Arc<dyn EventStore>,
288        config: AgentLoopCompactionConfig,
289    ) -> Self {
290        let AgentLoopCompactionConfig {
291            agent_config,
292            compaction_config,
293        } = config;
294        Self {
295            provider: Arc::new(provider),
296            tools: Arc::new(tools),
297            hooks: Arc::new(hooks),
298            message_store: Arc::new(message_store),
299            state_store: Arc::new(state_store),
300            event_store,
301            event_authority: None,
302            config: agent_config,
303            compaction_config: Some(compaction_config),
304            compactor: None,
305            execution_store: None,
306            audit_sink: Arc::new(crate::hooks::NoopAuditSink),
307            #[cfg(feature = "otel")]
308            observability_store: None,
309        }
310    }
311
312    /// Set the authoritative tool audit sink.
313    ///
314    /// When set, the loop emits a [`ToolAuditRecord`](crate::advanced::ToolAuditRecord)
315    /// at every tool-lifecycle transition (blocked, requires-confirmation,
316    /// cached, replayed, invalidated, completed, persistence-failed).
317    ///
318    /// The default is [`NoopAuditSink`](crate::hooks::NoopAuditSink) which
319    /// discards every record — suitable for local/CLI usage. Servers should
320    /// swap in a durable sink.
321    #[must_use]
322    pub fn with_audit_sink(mut self, sink: impl crate::hooks::ToolAuditSink + 'static) -> Self {
323        self.audit_sink = Arc::new(sink);
324        self
325    }
326
327    /// Set the observability store for `GenAI` payload capture.
328    ///
329    /// When set, the store is called at each LLM request boundary to decide
330    /// whether payloads are inlined on spans, externalized, or omitted.
331    #[cfg(feature = "otel")]
332    #[must_use]
333    pub fn with_observability_store(
334        mut self,
335        store: impl crate::observability::ObservabilityStore + 'static,
336    ) -> Self {
337        self.observability_store = Some(Arc::new(store));
338        self
339    }
340
341    /// Resolve the event authority for this run.
342    ///
343    /// If an external authority was configured via the builder, use it.
344    /// Otherwise create a fresh [`LocalEventAuthority`] that starts at 0
345    /// (the pre-existing local/CLI behaviour).
346    fn resolve_authority(&self) -> Arc<dyn EventAuthority> {
347        self.event_authority
348            .clone()
349            .unwrap_or_else(|| Arc::new(LocalEventAuthority::new()))
350    }
351
352    /// Run the agent loop.
353    ///
354    /// This method allows the agent to pause when a tool requires confirmation,
355    /// returning an `AgentRunState::AwaitingConfirmation` that contains the
356    /// state needed to resume.
357    ///
358    /// When the `cancel_token` is cancelled, the agent interrupts in-flight
359    /// work at the SDK boundary: the LLM stream and any non-streaming LLM call
360    /// are raced against the token (see `agent_loop/llm.rs`), and in-flight
361    /// tool executions are dropped with balanced `ToolResult`s synthesized so
362    /// the conversation history stays consistent. The run then ends with
363    /// `AgentRunState::Cancelled` — cancellation is *not* deferred to a turn
364    /// boundary. Tools that hold OS resources must honour the
365    /// [cooperative-cancel contract](crate::tools::Tool#cooperative-cancellation);
366    /// see the section below.
367    ///
368    /// # Arguments
369    ///
370    /// * `thread_id` - The thread identifier for this conversation
371    /// * `input` - Either a new text message or a resume with confirmation decision
372    /// * `tool_context` - Context passed to tools
373    /// * `cancel_token` - Token to signal cancellation from outside
374    ///
375    /// # Returns
376    ///
377    /// A future that resolves to the final [`AgentRunState`]. Awaiting it
378    /// drives the run to completion:
379    ///
380    /// ```ignore
381    /// let final_state = agent.run(thread_id, input, tool_ctx, cancel).await?;
382    /// ```
383    ///
384    /// The future is `'static` — the run is already spawned on a Tokio task
385    /// before this returns, so dropping the future does **not** stop the run
386    /// (use `cancel_token`). Awaiting it only waits for the result.
387    ///
388    /// # Example
389    ///
390    /// ```ignore
391    /// let cancel = CancellationToken::new();
392    /// let final_state = agent.run(
393    ///     thread_id,
394    ///     AgentInput::Text("Hello".to_string()),
395    ///     tool_ctx,
396    ///     cancel.clone(),
397    /// ).await?;
398    ///
399    /// match final_state {
400    ///     AgentRunState::Done { .. } => { /* completed */ }
401    ///     AgentRunState::Cancelled { .. } => { /* user cancelled */ }
402    ///     AgentRunState::AwaitingConfirmation { continuation, .. } => {
403    ///         // Get user decision, then resume:
404    ///         let state2 = agent.run(
405    ///             thread_id,
406    ///             AgentInput::Resume {
407    ///                 continuation,
408    ///                 tool_call_id: id,
409    ///                 confirmed: true,
410    ///                 rejection_reason: None,
411    ///             },
412    ///             tool_ctx,
413    ///             cancel.clone(),
414    ///         ).await?;
415    ///     }
416    ///     AgentRunState::Error(e) => { /* handle error */ }
417    /// }
418    /// ```
419    /// # Cancellation, timeout, and the dropped `JoinHandle`
420    ///
421    /// `run` spawns the agent loop on a Tokio task and returns a future over
422    /// the state channel — it **drops** the task's
423    /// [`tokio::task::JoinHandle`]. Dropping a `JoinHandle` *detaches* the
424    /// task rather than aborting it, so the only ways to stop an in-flight
425    /// run are the `cancel_token` (cooperative) or the per-tool
426    /// [`AgentConfig::tool_timeout_ms`](crate::types::AgentConfig::tool_timeout_ms)
427    /// boundary. Callers that need to forcibly abort must use
428    /// [`run_abortable`](Self::run_abortable) and keep the handle.
429    ///
430    /// Because the handle is dropped, a tool that holds a subprocess open
431    /// must obey the
432    /// [cooperative-cancel contract](crate::tools::Tool#cooperative-cancellation)
433    /// (`kill_on_drop` or a token-aware `kill`) or the subprocess will
434    /// outlive the cancelled / timed-out run. A `debug!` is logged here so
435    /// the detach is visible when chasing a leaked subprocess.
436    ///
437    /// # Errors
438    ///
439    /// Returns an error only if the spawned run task is dropped before it can
440    /// report a final state (e.g. a runtime shutdown). A panic inside the run
441    /// is caught and surfaced as [`AgentRunState::Error`], not as an `Err`.
442    pub fn run(
443        &self,
444        thread_id: ThreadId,
445        input: AgentInput,
446        tool_context: ToolContext<Ctx>,
447        cancel_token: CancellationToken,
448    ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
449    where
450        Ctx: Clone,
451    {
452        let (state_rx, handle) = self.run_abortable(thread_id, input, tool_context, cancel_token);
453        warn_on_detached_run_handle(handle);
454        recv_run_state(state_rx)
455    }
456
457    /// Like [`run`](Self::run), but with caller-supplied trace metadata.
458    ///
459    /// Equivalent to `run` except that the supplied [`RunOptions`]
460    /// configure session/user IDs (propagated as `session.id` /
461    /// `user.id` baggage), `langfuse.trace.{name,tags,metadata.*,
462    /// input,output}`, `langfuse.{release,environment}`, and the
463    /// trace-text truncation ceiling.
464    ///
465    /// Use this instead of `run` whenever the consumer needs the
466    /// SDK to populate Langfuse trace metadata; `run` itself
467    /// continues to delegate here with `RunOptions::default()`.
468    ///
469    /// # Errors
470    ///
471    /// See [`run`](Self::run).
472    pub fn run_with_options(
473        &self,
474        thread_id: ThreadId,
475        input: AgentInput,
476        tool_context: ToolContext<Ctx>,
477        cancel_token: CancellationToken,
478        run_options: RunOptions,
479    ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
480    where
481        Ctx: Clone,
482    {
483        let (state_rx, handle) = self.run_abortable_with_options(
484            thread_id,
485            input,
486            tool_context,
487            cancel_token,
488            run_options,
489        );
490        warn_on_detached_run_handle(handle);
491        recv_run_state(state_rx)
492    }
493
494    /// Like [`run`](Self::run), but also returns the [`tokio::task::JoinHandle`] for the
495    /// spawned task.
496    ///
497    /// Callers that need to forcibly abort the agent loop (e.g. subagent
498    /// timeout) can call [`tokio::task::JoinHandle::abort`] on the returned handle.
499    /// Aborting the handle drops the in-flight LLM stream immediately
500    /// instead of waiting for the current turn to finish.
501    pub fn run_abortable(
502        &self,
503        thread_id: ThreadId,
504        input: AgentInput,
505        tool_context: ToolContext<Ctx>,
506        cancel_token: CancellationToken,
507    ) -> (
508        oneshot::Receiver<AgentRunState>,
509        tokio::task::JoinHandle<()>,
510    )
511    where
512        Ctx: Clone,
513    {
514        self.run_abortable_with_options(
515            thread_id,
516            input,
517            tool_context,
518            cancel_token,
519            RunOptions::default(),
520        )
521    }
522
523    /// Like [`run_abortable`](Self::run_abortable), but with
524    /// caller-supplied trace metadata. See [`run_with_options`](Self::run_with_options).
525    pub fn run_abortable_with_options(
526        &self,
527        thread_id: ThreadId,
528        input: AgentInput,
529        tool_context: ToolContext<Ctx>,
530        cancel_token: CancellationToken,
531        run_options: RunOptions,
532    ) -> (
533        oneshot::Receiver<AgentRunState>,
534        tokio::task::JoinHandle<()>,
535    )
536    where
537        Ctx: Clone,
538    {
539        // `run_options` only feeds OTel root-span metadata. On
540        // non-otel builds the value is genuinely not needed —
541        // explicitly drop it so the unused-variable / needless-pass
542        // lints stay quiet without us reaching for an
543        // `#[allow(...)]`.
544        #[cfg(not(feature = "otel"))]
545        drop(run_options);
546
547        let (state_tx, state_rx) = oneshot::channel();
548        let authority = self.resolve_authority();
549
550        let provider = Arc::clone(&self.provider);
551        let tools = Arc::clone(&self.tools);
552        let hooks = Arc::clone(&self.hooks);
553        let message_store = Arc::clone(&self.message_store);
554        let state_store = Arc::clone(&self.state_store);
555        let event_store = Arc::clone(&self.event_store);
556        let config = self.config.clone();
557        let compaction_config = self.compaction_config.clone();
558        let compactor = self.compactor.clone();
559        let execution_store = self.execution_store.clone();
560        let audit_sink = Arc::clone(&self.audit_sink);
561        #[cfg(feature = "otel")]
562        let observability_store = self.observability_store.clone();
563        #[cfg(feature = "otel")]
564        let parent_cx = crate::observability::context::capture_context();
565
566        let task = async move {
567            let result = run_loop_isolated(RunLoopParameters {
568                event_store,
569                authority,
570                thread_id,
571                input,
572                tool_context,
573                provider,
574                tools,
575                hooks,
576                message_store,
577                state_store,
578                config,
579                compaction_config,
580                compactor,
581                execution_store,
582                audit_sink,
583                cancel_token,
584                input_rx: None,
585                #[cfg(feature = "otel")]
586                run_options,
587                #[cfg(feature = "otel")]
588                observability_store,
589            })
590            .await;
591
592            let _ = state_tx.send(result);
593        };
594
595        #[cfg(feature = "otel")]
596        let task = {
597            use opentelemetry::trace::FutureExt;
598            task.with_context(parent_cx)
599        };
600
601        let handle = tokio::spawn(task);
602
603        (state_rx, handle)
604    }
605
606    /// Run the agent with a persistent input channel.
607    ///
608    /// Unlike [`Self::run`], this returns an [`AgentHandle`] that allows the caller
609    /// to inject new user messages into the running agent via `input_tx`.
610    /// The agent will process the initial input, then wait for new messages
611    /// on the channel between turns instead of exiting on `Done`.
612    ///
613    /// The agent exits when:
614    /// - The `input_tx` sender is dropped (no more messages) — reports `Done`
615    /// - The `cancel_token` is cancelled — reports `Cancelled`
616    /// - Max turns exceeded — reports `Error`
617    /// - An unsupported input variant is injected, or appending an injected
618    ///   message fails — reports `Error` (see [`AgentHandle::input_tx`])
619    pub fn run_persistent(
620        &self,
621        thread_id: ThreadId,
622        input: AgentInput,
623        tool_context: ToolContext<Ctx>,
624        cancel_token: CancellationToken,
625    ) -> AgentHandle
626    where
627        Ctx: Clone,
628    {
629        self.run_persistent_with_options(
630            thread_id,
631            input,
632            tool_context,
633            cancel_token,
634            RunOptions::default(),
635        )
636    }
637
638    /// Like [`run_persistent`](Self::run_persistent), but with
639    /// caller-supplied trace metadata. See
640    /// [`run_with_options`](Self::run_with_options).
641    pub fn run_persistent_with_options(
642        &self,
643        thread_id: ThreadId,
644        input: AgentInput,
645        tool_context: ToolContext<Ctx>,
646        cancel_token: CancellationToken,
647        run_options: RunOptions,
648    ) -> AgentHandle
649    where
650        Ctx: Clone,
651    {
652        // See `run_abortable_with_options` for why we explicitly
653        // drop `run_options` on non-otel builds.
654        #[cfg(not(feature = "otel"))]
655        drop(run_options);
656
657        let (state_tx, state_rx) = oneshot::channel();
658        let (input_tx, input_rx) = mpsc::channel(32);
659        let authority = self.resolve_authority();
660
661        let provider = Arc::clone(&self.provider);
662        let tools = Arc::clone(&self.tools);
663        let hooks = Arc::clone(&self.hooks);
664        let message_store = Arc::clone(&self.message_store);
665        let state_store = Arc::clone(&self.state_store);
666        let event_store = Arc::clone(&self.event_store);
667        let config = self.config.clone();
668        let compaction_config = self.compaction_config.clone();
669        let compactor = self.compactor.clone();
670        let execution_store = self.execution_store.clone();
671        let audit_sink = Arc::clone(&self.audit_sink);
672        #[cfg(feature = "otel")]
673        let observability_store = self.observability_store.clone();
674        let cancel_handle = cancel_token.clone();
675        #[cfg(feature = "otel")]
676        let parent_cx = crate::observability::context::capture_context();
677
678        let task = async move {
679            let result = run_loop_isolated(RunLoopParameters {
680                event_store,
681                authority,
682                thread_id,
683                input,
684                tool_context,
685                provider,
686                tools,
687                hooks,
688                message_store,
689                state_store,
690                config,
691                compaction_config,
692                compactor,
693                execution_store,
694                audit_sink,
695                cancel_token,
696                input_rx: Some(input_rx),
697                #[cfg(feature = "otel")]
698                run_options,
699                #[cfg(feature = "otel")]
700                observability_store,
701            })
702            .await;
703
704            let _ = state_tx.send(result);
705        };
706
707        #[cfg(feature = "otel")]
708        let task = {
709            use opentelemetry::trace::FutureExt;
710            task.with_context(parent_cx)
711        };
712
713        tokio::spawn(task);
714
715        AgentHandle {
716            input_tx,
717            state_rx,
718            cancel_token: cancel_handle,
719        }
720    }
721
722    /// Run a single turn of the agent loop — the authoritative server boundary.
723    ///
724    /// Unlike `run()`, this method executes exactly one turn **directly in the
725    /// caller's task** (no `tokio::spawn`) and returns the result inline. This
726    /// enables external orchestration where each turn can be dispatched as a
727    /// separate message (e.g., via Artemis or another message queue).
728    ///
729    /// When the `cancel_token` is cancelled, the turn will be aborted before
730    /// starting execution and return `TurnOutcome::Cancelled`.
731    ///
732    /// # Arguments
733    ///
734    /// * `thread_id` - The thread identifier for this conversation
735    /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
736    /// * `tool_context` - Context passed to tools
737    /// * `cancel_token` - Token to signal cancellation from outside
738    /// * `options` - Execution options (tool runtime strategy, durability)
739    ///
740    /// # Returns
741    ///
742    /// A [`crate::types::TurnOutcome`] returned only after the configured event store's
743    /// `finish_turn(thread_id, turn)` barrier has completed.
744    ///
745    /// Every variant except [`crate::types::TurnOutcome::Error`] carries a
746    /// structured [`crate::types::TurnSummary`] in the `summary` field. This
747    /// summary is the **authoritative** server-facing outcome contract —
748    /// it contains the provider/model provenance, response ID, stop reason,
749    /// tool-call count, duration, and execution options for the turn. Server
750    /// code should read from `summary` rather than the legacy per-variant
751    /// fields (`total_turns`, `input_tokens`, `output_tokens`, …), which are
752    /// retained only for backwards compatibility with local callers.
753    ///
754    /// # Turn Outcomes
755    ///
756    /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
757    /// - `Done` - Agent completed successfully
758    /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
759    /// - `PendingToolCalls` - Tools need external execution (only with `ToolRuntime::External`)
760    /// - `Cancelled` - Turn was cancelled via the token
761    /// - `Error` - An error occurred (no summary attached)
762    ///
763    /// # Example
764    ///
765    /// ```ignore
766    /// use std::sync::Arc;
767    /// use agent_sdk::{InMemoryEventStore, TurnOptions};
768    ///
769    /// let cancel = CancellationToken::new();
770    /// let event_store = Arc::new(InMemoryEventStore::new());
771    /// let outcome = agent.run_turn(
772    ///     thread_id.clone(),
773    ///     AgentInput::Text("What is 2+2?".to_string()),
774    ///     tool_ctx.clone(),
775    ///     cancel,
776    ///     TurnOptions::default(),
777    /// ).await;
778    ///
779    /// let events = event_store.get_events(&thread_id).await?;
780    ///
781    /// // Read server-facing metadata from the TurnSummary.
782    /// if let Some(summary) = outcome.summary() {
783    ///     println!(
784    ///         "turn={} provider={} model={} stop={:?} response_id={:?}",
785    ///         summary.turn,
786    ///         summary.provenance.provider,
787    ///         summary.provenance.model,
788    ///         summary.stop_reason,
789    ///         summary.response_id,
790    ///     );
791    /// }
792    ///
793    /// // Branch on the variant for flow control.
794    /// match outcome {
795    ///     TurnOutcome::NeedsMoreTurns { turn, .. } => {
796    ///         // Dispatch another message to continue
797    ///     }
798    ///     TurnOutcome::Done { .. } => {
799    ///         // Conversation complete
800    ///     }
801    ///     TurnOutcome::PendingToolCalls { tool_calls, .. } => {
802    ///         // Execute tools externally, then call run_turn with Continue
803    ///     }
804    ///     _ => {}
805    /// }
806    /// ```
807    pub async fn run_turn(
808        &self,
809        thread_id: ThreadId,
810        input: AgentInput,
811        tool_context: ToolContext<Ctx>,
812        cancel_token: CancellationToken,
813        options: TurnOptions,
814    ) -> crate::types::TurnOutcome
815    where
816        Ctx: Clone,
817    {
818        self.run_turn_with_options(
819            thread_id,
820            input,
821            tool_context,
822            cancel_token,
823            options,
824            RunOptions::default(),
825        )
826        .await
827    }
828
829    /// Like [`run_turn`](Self::run_turn), but with caller-supplied
830    /// trace metadata.
831    ///
832    /// See [`run_with_options`](Self::run_with_options) for the full
833    /// [`RunOptions`] contract. The `turn_options` parameter retains
834    /// its existing semantics (tool runtime / strict durability);
835    /// `run_options` is layered on top to populate Langfuse trace
836    /// metadata on the root `invoke_agent` span.
837    pub async fn run_turn_with_options(
838        &self,
839        thread_id: ThreadId,
840        input: AgentInput,
841        tool_context: ToolContext<Ctx>,
842        cancel_token: CancellationToken,
843        turn_options: TurnOptions,
844        run_options: RunOptions,
845    ) -> crate::types::TurnOutcome
846    where
847        Ctx: Clone,
848    {
849        // See `run_abortable_with_options` for why we explicitly
850        // drop `run_options` on non-otel builds.
851        #[cfg(not(feature = "otel"))]
852        drop(run_options);
853
854        let authority = self.resolve_authority();
855
856        run_single_turn(TurnParameters {
857            event_store: Arc::clone(&self.event_store),
858            authority,
859            thread_id,
860            input,
861            tool_context,
862            provider: Arc::clone(&self.provider),
863            tools: Arc::clone(&self.tools),
864            hooks: Arc::clone(&self.hooks),
865            message_store: Arc::clone(&self.message_store),
866            state_store: Arc::clone(&self.state_store),
867            config: self.config.clone(),
868            compaction_config: self.compaction_config.clone(),
869            compactor: self.compactor.clone(),
870            execution_store: self.execution_store.clone(),
871            audit_sink: Arc::clone(&self.audit_sink),
872            cancel_token,
873            turn_options,
874            #[cfg(feature = "otel")]
875            run_options,
876            #[cfg(feature = "otel")]
877            observability_store: self.observability_store.clone(),
878        })
879        .await
880    }
881}
882
883/// High-level convenience API for agents whose tools take no application
884/// context (`Ctx = ()`).
885///
886/// [`ask`](Self::ask) and [`send`](Self::send) collapse the four pieces of
887/// ceremony in the low-level [`run`](Self::run) path — constructing a
888/// [`ToolContext::new(())`](crate::ToolContext::new), creating a
889/// [`CancellationToken`], awaiting the run, and reassembling the assistant
890/// text out of the event store — into a single call that returns a `String`.
891///
892/// Reach for [`run`](Self::run) or [`run_turn`](Self::run_turn) when you need
893/// application context, cancellation, confirmation flow, or access to the raw
894/// [`AgentRunState`].
895impl<P, H, M, S> AgentLoop<(), P, H, M, S>
896where
897    P: LlmProvider + 'static,
898    H: AgentHooks + 'static,
899    M: MessageStore + 'static,
900    S: StateStore + 'static,
901{
902    /// Ask the agent a question and return its assembled reply.
903    ///
904    /// This is the 30-second on-ramp: it builds a fresh
905    /// [`ToolContext::new(())`](crate::ToolContext::new) and a
906    /// [`CancellationToken`] internally, runs the agent to completion, and
907    /// returns the assistant text emitted during this call concatenated into
908    /// one `String`.
909    ///
910    /// For confirmation flows, application context, or explicit cancellation,
911    /// use [`run`](Self::run) directly.
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the run task is dropped before reporting a state,
916    /// if the run ends in [`AgentRunState::Error`], or if the event store
917    /// cannot be read back.
918    pub async fn ask(
919        &self,
920        thread_id: ThreadId,
921        text: impl Into<String>,
922    ) -> anyhow::Result<String> {
923        self.send(thread_id, AgentInput::Text(text.into())).await
924    }
925
926    /// Send an [`AgentInput`] to the agent and return its assembled reply.
927    ///
928    /// Like [`ask`](Self::ask) but accepts a full [`AgentInput`] (e.g. to
929    /// resume after confirmation). Builds the
930    /// [`ToolContext`](crate::ToolContext) and [`CancellationToken`]
931    /// internally and returns the assistant text emitted during this call.
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the run task is dropped before reporting a state,
936    /// if the run ends in [`AgentRunState::Error`], or if the event store
937    /// cannot be read back.
938    pub async fn send(&self, thread_id: ThreadId, input: AgentInput) -> anyhow::Result<String> {
939        use crate::events::AgentEvent;
940
941        // Snapshot the existing event count so we only assemble text emitted
942        // by this call, not earlier turns persisted on the same thread.
943        // `event_count` + `get_events_since` avoid materializing (and cloning)
944        // the entire thread history twice per call — repeated sends on a
945        // long-lived thread would otherwise be O(n^2) cumulative.
946        let baseline = self.event_store.event_count(&thread_id).await?;
947
948        let state = self
949            .run(
950                thread_id.clone(),
951                input,
952                ToolContext::new(()),
953                CancellationToken::new(),
954            )
955            .await?;
956
957        if let AgentRunState::Error(error) = state {
958            return Err(anyhow::Error::new(error));
959        }
960
961        let events = self
962            .event_store
963            .get_events_since(&thread_id, baseline)
964            .await?;
965        let reply = events
966            .into_iter()
967            .filter_map(|envelope| match envelope.event {
968                AgentEvent::Text { text, .. } => Some(text),
969                _ => None,
970            })
971            .collect::<String>();
972
973        Ok(reply)
974    }
975}