Skip to main content

agent_sdk/
agent_loop.rs

1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Persists events throughout to the configured event store
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//!     .provider(AnthropicProvider::sonnet(api_key))
26//!     .tools(my_tools)
27//!     .event_store(event_store)
28//!     .build();
29//! ```
30
31mod builder;
32mod helpers;
33mod idempotency;
34mod listen;
35mod llm;
36mod run_loop;
37#[cfg(test)]
38mod test_utils;
39#[cfg(test)]
40mod tests;
41mod tool_execution;
42mod turn;
43mod types;
44
45use self::run_loop::{run_loop, run_single_turn};
46use self::types::{RunLoopParameters, TurnParameters};
47use crate::types::TurnOptions;
48
49pub use self::builder::AgentLoopBuilder;
50
51use crate::authority::{EventAuthority, LocalEventAuthority};
52use crate::context::{CompactionConfig, ContextCompactor};
53use crate::hooks::AgentHooks;
54use crate::llm::LlmProvider;
55use crate::stores::{EventStore, MessageStore, StateStore, ToolExecutionStore};
56use crate::tools::{ToolContext, ToolRegistry};
57use crate::types::{AgentConfig, AgentError, AgentInput, AgentRunState, RunOptions, ThreadId};
58use futures::FutureExt;
59use std::future::Future;
60use std::panic::AssertUnwindSafe;
61use std::sync::Arc;
62use tokio::sync::{mpsc, oneshot};
63use tokio_util::sync::CancellationToken;
64
65/// Run the agent loop with panic isolation at the spawned-task boundary.
66///
67/// A panic anywhere inside the run loop — most importantly inside the
68/// LLM provider call or the compaction provider call, neither of which
69/// is otherwise guarded — would unwind the spawned task, drop the
70/// `state_tx` oneshot, and surface to the caller as an opaque
71/// [`oneshot::error::RecvError`] (and, for subagents, be misclassified
72/// as `Disconnected`). Catching the unwind here turns it into a
73/// structured [`AgentRunState::Error`] that the task can still send on
74/// `state_tx`, so the run ends observably rather than silently tearing
75/// down the channel.
76///
77/// `AssertUnwindSafe` is sound at this boundary: the run loop owns its
78/// `RunLoopParameters` by value, so nothing it touches outlives the
79/// caught panic and is observed afterwards. The only value produced is
80/// the returned `AgentRunState`. Tool-level panics are already caught
81/// closer to the tool boundary (see
82/// `agent_loop::helpers::catch_tool_panic`) so the assistant
83/// `tool_use` / `tool_result` history stays balanced; this guard is the
84/// outer safety net for everything else.
85async fn run_loop_isolated<Ctx, P, H, M, S>(
86    params: RunLoopParameters<Ctx, P, H, M, S>,
87) -> AgentRunState
88where
89    Ctx: Send + Sync + Clone + 'static,
90    P: LlmProvider,
91    H: AgentHooks,
92    M: MessageStore,
93    S: StateStore,
94{
95    match AssertUnwindSafe(run_loop(params)).catch_unwind().await {
96        Ok(state) => state,
97        Err(payload) => {
98            let message = self::helpers::panic_payload_message(payload.as_ref());
99            log::error!("agent run loop panicked: {message}");
100            AgentRunState::Error(AgentError::new(
101                format!("Agent run panicked: {message}"),
102                false,
103            ))
104        }
105    }
106}
107
108/// Drop a spawned run task's [`tokio::task::JoinHandle`], logging a
109/// `debug!` to make the detach visible.
110///
111/// `run` / `run_with_options` intentionally drop the handle: the run is
112/// stopped through the cancel token or the per-tool timeout, not by
113/// aborting the task. Surfacing the detach at `debug` level gives a
114/// breadcrumb when a subprocess-backed tool that ignores the
115/// cooperative-cancel contract leaks a process after cancellation.
116fn warn_on_detached_run_handle(handle: tokio::task::JoinHandle<()>) {
117    log::debug!(
118        "agent run JoinHandle dropped (task detached); the run can only be \
119         stopped via its cancel token or per-tool timeout. Subprocess-backed \
120         tools must honour kill_on_drop or a token-aware kill to avoid leaks"
121    );
122    drop(handle);
123}
124
125/// Await a run's state receiver, mapping a dropped channel to an `anyhow`
126/// error so `run`/`run_with_options` can present an `impl Future` instead of
127/// a bare [`oneshot::Receiver`].
128///
129/// A panic inside the run is already converted to
130/// [`AgentRunState::Error`] before the channel send (see
131/// [`run_loop_isolated`]), so the only way `recv` fails is the sender being
132/// dropped without sending — a runtime shutdown rather than an agent error.
133async fn recv_run_state(
134    state_rx: oneshot::Receiver<AgentRunState>,
135) -> anyhow::Result<AgentRunState> {
136    state_rx
137        .await
138        .map_err(|_| anyhow::anyhow!("agent run task was dropped before reporting a final state"))
139}
140
141/// Handle to a persistent agent thread.
142///
143/// Returned by [`AgentLoop::run_persistent`]. Allows the caller to send
144/// new messages to the running agent and cancel execution.
145pub struct AgentHandle {
146    /// Send new messages to the running agent. The agent will process
147    /// them as new user turns after completing the current turn.
148    pub input_tx: mpsc::Sender<AgentInput>,
149    /// Final run state (sent once when the agent completes).
150    pub state_rx: oneshot::Receiver<AgentRunState>,
151    /// Cancel the running agent.
152    pub cancel_token: CancellationToken,
153}
154
155/// Configuration bundle for constructing an [`AgentLoop`] with compaction.
156pub struct AgentLoopCompactionConfig {
157    pub agent_config: AgentConfig,
158    pub compaction_config: CompactionConfig,
159}
160
161impl AgentLoopCompactionConfig {
162    #[must_use]
163    pub const fn new(agent_config: AgentConfig, compaction_config: CompactionConfig) -> Self {
164        Self {
165            agent_config,
166            compaction_config,
167        }
168    }
169}
170
171/// The main agent loop that orchestrates LLM calls and tool execution.
172///
173/// `AgentLoop` is the core component that:
174/// - Manages conversation state via message and state stores
175/// - Calls the LLM provider and processes responses
176/// - Executes tools through the tool registry
177/// - Persists events to the configured event store
178/// - Enforces hooks for tool permissions and lifecycle events
179///
180/// # Type Parameters
181///
182/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
183/// - `P`: The LLM provider implementation
184/// - `H`: The hooks implementation for lifecycle customization
185/// - `M`: The message store implementation
186/// - `S`: The state store implementation
187///
188/// # Event Storage
189///
190/// Every loop instance requires an [`EventStore`] configured at construction
191/// time. Events are written to that store for the entire lifecycle of the loop,
192/// and callers read them back from the store instead of receiving an in-process
193/// channel from the runtime.
194///
195/// # Running the Agent
196///
197/// ```ignore
198/// let final_state = agent.run(
199///     thread_id,
200///     AgentInput::Text("Hello!".to_string()),
201///     tool_ctx,
202/// );
203/// let state = final_state.await?;
204/// let events = event_store.get_events(&thread_id).await?;
205/// ```
206pub struct AgentLoop<Ctx, P, H, M, S>
207where
208    P: LlmProvider,
209    H: AgentHooks,
210    M: MessageStore,
211    S: StateStore,
212{
213    pub(super) provider: Arc<P>,
214    pub(super) tools: Arc<ToolRegistry<Ctx>>,
215    pub(super) hooks: Arc<H>,
216    pub(super) message_store: Arc<M>,
217    pub(super) state_store: Arc<S>,
218    pub(super) event_store: Arc<dyn EventStore>,
219    pub(super) event_authority: Option<Arc<dyn EventAuthority>>,
220    pub(super) config: AgentConfig,
221    pub(super) compaction_config: Option<CompactionConfig>,
222    pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
223    pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
224    pub(super) audit_sink: Arc<dyn crate::hooks::ToolAuditSink>,
225    #[cfg(feature = "otel")]
226    pub(super) observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
227}
228
229/// Create a new builder for constructing an `AgentLoop`.
230#[must_use]
231pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
232    AgentLoopBuilder::new()
233}
234
235impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
236where
237    Ctx: Send + Sync + 'static,
238    P: LlmProvider + 'static,
239    H: AgentHooks + 'static,
240    M: MessageStore + 'static,
241    S: StateStore + 'static,
242{
243    /// Create a new agent loop with all components specified directly.
244    #[must_use]
245    pub fn new(
246        provider: P,
247        tools: ToolRegistry<Ctx>,
248        hooks: H,
249        message_store: M,
250        state_store: S,
251        event_store: Arc<dyn EventStore>,
252        config: AgentConfig,
253    ) -> Self {
254        Self {
255            provider: Arc::new(provider),
256            tools: Arc::new(tools),
257            hooks: Arc::new(hooks),
258            message_store: Arc::new(message_store),
259            state_store: Arc::new(state_store),
260            event_store,
261            event_authority: None,
262            config,
263            compaction_config: None,
264            compactor: None,
265            execution_store: None,
266            audit_sink: Arc::new(crate::hooks::NoopAuditSink),
267            #[cfg(feature = "otel")]
268            observability_store: None,
269        }
270    }
271
272    /// Create a new agent loop with compaction enabled.
273    #[must_use]
274    pub fn with_compaction(
275        provider: P,
276        tools: ToolRegistry<Ctx>,
277        hooks: H,
278        message_store: M,
279        state_store: S,
280        event_store: Arc<dyn EventStore>,
281        config: AgentLoopCompactionConfig,
282    ) -> Self {
283        let AgentLoopCompactionConfig {
284            agent_config,
285            compaction_config,
286        } = config;
287        Self {
288            provider: Arc::new(provider),
289            tools: Arc::new(tools),
290            hooks: Arc::new(hooks),
291            message_store: Arc::new(message_store),
292            state_store: Arc::new(state_store),
293            event_store,
294            event_authority: None,
295            config: agent_config,
296            compaction_config: Some(compaction_config),
297            compactor: None,
298            execution_store: None,
299            audit_sink: Arc::new(crate::hooks::NoopAuditSink),
300            #[cfg(feature = "otel")]
301            observability_store: None,
302        }
303    }
304
305    /// Set the authoritative tool audit sink.
306    ///
307    /// When set, the loop emits a [`ToolAuditRecord`](crate::advanced::ToolAuditRecord)
308    /// at every tool-lifecycle transition (blocked, requires-confirmation,
309    /// cached, replayed, invalidated, completed, persistence-failed).
310    ///
311    /// The default is [`NoopAuditSink`](crate::hooks::NoopAuditSink) which
312    /// discards every record — suitable for local/CLI usage. Servers should
313    /// swap in a durable sink.
314    #[must_use]
315    pub fn with_audit_sink(mut self, sink: impl crate::hooks::ToolAuditSink + 'static) -> Self {
316        self.audit_sink = Arc::new(sink);
317        self
318    }
319
320    /// Set the observability store for `GenAI` payload capture.
321    ///
322    /// When set, the store is called at each LLM request boundary to decide
323    /// whether payloads are inlined on spans, externalized, or omitted.
324    #[cfg(feature = "otel")]
325    #[must_use]
326    pub fn with_observability_store(
327        mut self,
328        store: impl crate::observability::ObservabilityStore + 'static,
329    ) -> Self {
330        self.observability_store = Some(Arc::new(store));
331        self
332    }
333
334    /// Resolve the event authority for this run.
335    ///
336    /// If an external authority was configured via the builder, use it.
337    /// Otherwise create a fresh [`LocalEventAuthority`] that starts at 0
338    /// (the pre-existing local/CLI behaviour).
339    fn resolve_authority(&self) -> Arc<dyn EventAuthority> {
340        self.event_authority
341            .clone()
342            .unwrap_or_else(|| Arc::new(LocalEventAuthority::new()))
343    }
344
345    /// Run the agent loop.
346    ///
347    /// This method allows the agent to pause when a tool requires confirmation,
348    /// returning an `AgentRunState::AwaitingConfirmation` that contains the
349    /// state needed to resume.
350    ///
351    /// When the `cancel_token` is cancelled, the agent will stop after the
352    /// current turn completes (no new turns will start). The final state will
353    /// be `AgentRunState::Cancelled`.
354    ///
355    /// # Arguments
356    ///
357    /// * `thread_id` - The thread identifier for this conversation
358    /// * `input` - Either a new text message or a resume with confirmation decision
359    /// * `tool_context` - Context passed to tools
360    /// * `cancel_token` - Token to signal cancellation from outside
361    ///
362    /// # Returns
363    ///
364    /// A future that resolves to the final [`AgentRunState`]. Awaiting it
365    /// drives the run to completion:
366    ///
367    /// ```ignore
368    /// let final_state = agent.run(thread_id, input, tool_ctx, cancel).await?;
369    /// ```
370    ///
371    /// The future is `'static` — the run is already spawned on a Tokio task
372    /// before this returns, so dropping the future does **not** stop the run
373    /// (use `cancel_token`). Awaiting it only waits for the result.
374    ///
375    /// # Example
376    ///
377    /// ```ignore
378    /// let cancel = CancellationToken::new();
379    /// let final_state = agent.run(
380    ///     thread_id,
381    ///     AgentInput::Text("Hello".to_string()),
382    ///     tool_ctx,
383    ///     cancel.clone(),
384    /// ).await?;
385    ///
386    /// match final_state {
387    ///     AgentRunState::Done { .. } => { /* completed */ }
388    ///     AgentRunState::Cancelled { .. } => { /* user cancelled */ }
389    ///     AgentRunState::AwaitingConfirmation { continuation, .. } => {
390    ///         // Get user decision, then resume:
391    ///         let state2 = agent.run(
392    ///             thread_id,
393    ///             AgentInput::Resume {
394    ///                 continuation,
395    ///                 tool_call_id: id,
396    ///                 confirmed: true,
397    ///                 rejection_reason: None,
398    ///             },
399    ///             tool_ctx,
400    ///             cancel.clone(),
401    ///         ).await?;
402    ///     }
403    ///     AgentRunState::Error(e) => { /* handle error */ }
404    /// }
405    /// ```
406    /// # Cancellation, timeout, and the dropped `JoinHandle`
407    ///
408    /// `run` spawns the agent loop on a Tokio task and returns a future over
409    /// the state channel — it **drops** the task's
410    /// [`tokio::task::JoinHandle`]. Dropping a `JoinHandle` *detaches* the
411    /// task rather than aborting it, so the only ways to stop an in-flight
412    /// run are the `cancel_token` (cooperative) or the per-tool
413    /// [`AgentConfig::tool_timeout_ms`](crate::types::AgentConfig::tool_timeout_ms)
414    /// boundary. Callers that need to forcibly abort must use
415    /// [`run_abortable`](Self::run_abortable) and keep the handle.
416    ///
417    /// Because the handle is dropped, a tool that holds a subprocess open
418    /// must obey the
419    /// [cooperative-cancel contract](crate::tools::Tool#cooperative-cancellation)
420    /// (`kill_on_drop` or a token-aware `kill`) or the subprocess will
421    /// outlive the cancelled / timed-out run. A `debug!` is logged here so
422    /// the detach is visible when chasing a leaked subprocess.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error only if the spawned run task is dropped before it can
427    /// report a final state (e.g. a runtime shutdown). A panic inside the run
428    /// is caught and surfaced as [`AgentRunState::Error`], not as an `Err`.
429    pub fn run(
430        &self,
431        thread_id: ThreadId,
432        input: AgentInput,
433        tool_context: ToolContext<Ctx>,
434        cancel_token: CancellationToken,
435    ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
436    where
437        Ctx: Clone,
438    {
439        let (state_rx, handle) = self.run_abortable(thread_id, input, tool_context, cancel_token);
440        warn_on_detached_run_handle(handle);
441        recv_run_state(state_rx)
442    }
443
444    /// Like [`run`](Self::run), but with caller-supplied trace metadata.
445    ///
446    /// Equivalent to `run` except that the supplied [`RunOptions`]
447    /// configure session/user IDs (propagated as `session.id` /
448    /// `user.id` baggage), `langfuse.trace.{name,tags,metadata.*,
449    /// input,output}`, `langfuse.{release,environment}`, and the
450    /// trace-text truncation ceiling.
451    ///
452    /// Use this instead of `run` whenever the consumer needs the
453    /// SDK to populate Langfuse trace metadata; `run` itself
454    /// continues to delegate here with `RunOptions::default()`.
455    ///
456    /// # Errors
457    ///
458    /// See [`run`](Self::run).
459    pub fn run_with_options(
460        &self,
461        thread_id: ThreadId,
462        input: AgentInput,
463        tool_context: ToolContext<Ctx>,
464        cancel_token: CancellationToken,
465        run_options: RunOptions,
466    ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
467    where
468        Ctx: Clone,
469    {
470        let (state_rx, handle) = self.run_abortable_with_options(
471            thread_id,
472            input,
473            tool_context,
474            cancel_token,
475            run_options,
476        );
477        warn_on_detached_run_handle(handle);
478        recv_run_state(state_rx)
479    }
480
481    /// Like [`run`](Self::run), but also returns the [`tokio::task::JoinHandle`] for the
482    /// spawned task.
483    ///
484    /// Callers that need to forcibly abort the agent loop (e.g. subagent
485    /// timeout) can call [`tokio::task::JoinHandle::abort`] on the returned handle.
486    /// Aborting the handle drops the in-flight LLM stream immediately
487    /// instead of waiting for the current turn to finish.
488    pub fn run_abortable(
489        &self,
490        thread_id: ThreadId,
491        input: AgentInput,
492        tool_context: ToolContext<Ctx>,
493        cancel_token: CancellationToken,
494    ) -> (
495        oneshot::Receiver<AgentRunState>,
496        tokio::task::JoinHandle<()>,
497    )
498    where
499        Ctx: Clone,
500    {
501        self.run_abortable_with_options(
502            thread_id,
503            input,
504            tool_context,
505            cancel_token,
506            RunOptions::default(),
507        )
508    }
509
510    /// Like [`run_abortable`](Self::run_abortable), but with
511    /// caller-supplied trace metadata. See [`run_with_options`](Self::run_with_options).
512    pub fn run_abortable_with_options(
513        &self,
514        thread_id: ThreadId,
515        input: AgentInput,
516        tool_context: ToolContext<Ctx>,
517        cancel_token: CancellationToken,
518        run_options: RunOptions,
519    ) -> (
520        oneshot::Receiver<AgentRunState>,
521        tokio::task::JoinHandle<()>,
522    )
523    where
524        Ctx: Clone,
525    {
526        // `run_options` only feeds OTel root-span metadata. On
527        // non-otel builds the value is genuinely not needed —
528        // explicitly drop it so the unused-variable / needless-pass
529        // lints stay quiet without us reaching for an
530        // `#[allow(...)]`.
531        #[cfg(not(feature = "otel"))]
532        drop(run_options);
533
534        let (state_tx, state_rx) = oneshot::channel();
535        let authority = self.resolve_authority();
536
537        let provider = Arc::clone(&self.provider);
538        let tools = Arc::clone(&self.tools);
539        let hooks = Arc::clone(&self.hooks);
540        let message_store = Arc::clone(&self.message_store);
541        let state_store = Arc::clone(&self.state_store);
542        let event_store = Arc::clone(&self.event_store);
543        let config = self.config.clone();
544        let compaction_config = self.compaction_config.clone();
545        let compactor = self.compactor.clone();
546        let execution_store = self.execution_store.clone();
547        let audit_sink = Arc::clone(&self.audit_sink);
548        #[cfg(feature = "otel")]
549        let observability_store = self.observability_store.clone();
550        #[cfg(feature = "otel")]
551        let parent_cx = crate::observability::context::capture_context();
552
553        let task = async move {
554            let result = run_loop_isolated(RunLoopParameters {
555                event_store,
556                authority,
557                thread_id,
558                input,
559                tool_context,
560                provider,
561                tools,
562                hooks,
563                message_store,
564                state_store,
565                config,
566                compaction_config,
567                compactor,
568                execution_store,
569                audit_sink,
570                cancel_token,
571                input_rx: None,
572                #[cfg(feature = "otel")]
573                run_options,
574                #[cfg(feature = "otel")]
575                observability_store,
576            })
577            .await;
578
579            let _ = state_tx.send(result);
580        };
581
582        #[cfg(feature = "otel")]
583        let task = {
584            use opentelemetry::trace::FutureExt;
585            task.with_context(parent_cx)
586        };
587
588        let handle = tokio::spawn(task);
589
590        (state_rx, handle)
591    }
592
593    /// Run the agent with a persistent input channel.
594    ///
595    /// Unlike [`Self::run`], this returns an [`AgentHandle`] that allows the caller
596    /// to inject new user messages into the running agent via `input_tx`.
597    /// The agent will process the initial input, then wait for new messages
598    /// on the channel between turns instead of exiting on `Done`.
599    ///
600    /// The agent exits when:
601    /// - The `input_tx` sender is dropped (no more messages)
602    /// - The `cancel_token` is cancelled
603    /// - Max turns exceeded
604    pub fn run_persistent(
605        &self,
606        thread_id: ThreadId,
607        input: AgentInput,
608        tool_context: ToolContext<Ctx>,
609        cancel_token: CancellationToken,
610    ) -> AgentHandle
611    where
612        Ctx: Clone,
613    {
614        self.run_persistent_with_options(
615            thread_id,
616            input,
617            tool_context,
618            cancel_token,
619            RunOptions::default(),
620        )
621    }
622
623    /// Like [`run_persistent`](Self::run_persistent), but with
624    /// caller-supplied trace metadata. See
625    /// [`run_with_options`](Self::run_with_options).
626    pub fn run_persistent_with_options(
627        &self,
628        thread_id: ThreadId,
629        input: AgentInput,
630        tool_context: ToolContext<Ctx>,
631        cancel_token: CancellationToken,
632        run_options: RunOptions,
633    ) -> AgentHandle
634    where
635        Ctx: Clone,
636    {
637        // See `run_abortable_with_options` for why we explicitly
638        // drop `run_options` on non-otel builds.
639        #[cfg(not(feature = "otel"))]
640        drop(run_options);
641
642        let (state_tx, state_rx) = oneshot::channel();
643        let (input_tx, input_rx) = mpsc::channel(32);
644        let authority = self.resolve_authority();
645
646        let provider = Arc::clone(&self.provider);
647        let tools = Arc::clone(&self.tools);
648        let hooks = Arc::clone(&self.hooks);
649        let message_store = Arc::clone(&self.message_store);
650        let state_store = Arc::clone(&self.state_store);
651        let event_store = Arc::clone(&self.event_store);
652        let config = self.config.clone();
653        let compaction_config = self.compaction_config.clone();
654        let compactor = self.compactor.clone();
655        let execution_store = self.execution_store.clone();
656        let audit_sink = Arc::clone(&self.audit_sink);
657        #[cfg(feature = "otel")]
658        let observability_store = self.observability_store.clone();
659        let cancel_handle = cancel_token.clone();
660        #[cfg(feature = "otel")]
661        let parent_cx = crate::observability::context::capture_context();
662
663        let task = async move {
664            let result = run_loop_isolated(RunLoopParameters {
665                event_store,
666                authority,
667                thread_id,
668                input,
669                tool_context,
670                provider,
671                tools,
672                hooks,
673                message_store,
674                state_store,
675                config,
676                compaction_config,
677                compactor,
678                execution_store,
679                audit_sink,
680                cancel_token,
681                input_rx: Some(input_rx),
682                #[cfg(feature = "otel")]
683                run_options,
684                #[cfg(feature = "otel")]
685                observability_store,
686            })
687            .await;
688
689            let _ = state_tx.send(result);
690        };
691
692        #[cfg(feature = "otel")]
693        let task = {
694            use opentelemetry::trace::FutureExt;
695            task.with_context(parent_cx)
696        };
697
698        tokio::spawn(task);
699
700        AgentHandle {
701            input_tx,
702            state_rx,
703            cancel_token: cancel_handle,
704        }
705    }
706
707    /// Run a single turn of the agent loop — the authoritative server boundary.
708    ///
709    /// Unlike `run()`, this method executes exactly one turn **directly in the
710    /// caller's task** (no `tokio::spawn`) and returns the result inline. This
711    /// enables external orchestration where each turn can be dispatched as a
712    /// separate message (e.g., via Artemis or another message queue).
713    ///
714    /// When the `cancel_token` is cancelled, the turn will be aborted before
715    /// starting execution and return `TurnOutcome::Cancelled`.
716    ///
717    /// # Arguments
718    ///
719    /// * `thread_id` - The thread identifier for this conversation
720    /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
721    /// * `tool_context` - Context passed to tools
722    /// * `cancel_token` - Token to signal cancellation from outside
723    /// * `options` - Execution options (tool runtime strategy, durability)
724    ///
725    /// # Returns
726    ///
727    /// A [`crate::types::TurnOutcome`] returned only after the configured event store's
728    /// `finish_turn(thread_id, turn)` barrier has completed.
729    ///
730    /// Every variant except [`crate::types::TurnOutcome::Error`] carries a
731    /// structured [`crate::types::TurnSummary`] in the `summary` field. This
732    /// summary is the **authoritative** server-facing outcome contract —
733    /// it contains the provider/model provenance, response ID, stop reason,
734    /// tool-call count, duration, and execution options for the turn. Server
735    /// code should read from `summary` rather than the legacy per-variant
736    /// fields (`total_turns`, `input_tokens`, `output_tokens`, …), which are
737    /// retained only for backwards compatibility with local callers.
738    ///
739    /// # Turn Outcomes
740    ///
741    /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
742    /// - `Done` - Agent completed successfully
743    /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
744    /// - `PendingToolCalls` - Tools need external execution (only with `ToolRuntime::External`)
745    /// - `Cancelled` - Turn was cancelled via the token
746    /// - `Error` - An error occurred (no summary attached)
747    ///
748    /// # Example
749    ///
750    /// ```ignore
751    /// use std::sync::Arc;
752    /// use agent_sdk::{InMemoryEventStore, TurnOptions};
753    ///
754    /// let cancel = CancellationToken::new();
755    /// let event_store = Arc::new(InMemoryEventStore::new());
756    /// let outcome = agent.run_turn(
757    ///     thread_id.clone(),
758    ///     AgentInput::Text("What is 2+2?".to_string()),
759    ///     tool_ctx.clone(),
760    ///     cancel,
761    ///     TurnOptions::default(),
762    /// ).await;
763    ///
764    /// let events = event_store.get_events(&thread_id).await?;
765    ///
766    /// // Read server-facing metadata from the TurnSummary.
767    /// if let Some(summary) = outcome.summary() {
768    ///     println!(
769    ///         "turn={} provider={} model={} stop={:?} response_id={:?}",
770    ///         summary.turn,
771    ///         summary.provenance.provider,
772    ///         summary.provenance.model,
773    ///         summary.stop_reason,
774    ///         summary.response_id,
775    ///     );
776    /// }
777    ///
778    /// // Branch on the variant for flow control.
779    /// match outcome {
780    ///     TurnOutcome::NeedsMoreTurns { turn, .. } => {
781    ///         // Dispatch another message to continue
782    ///     }
783    ///     TurnOutcome::Done { .. } => {
784    ///         // Conversation complete
785    ///     }
786    ///     TurnOutcome::PendingToolCalls { tool_calls, .. } => {
787    ///         // Execute tools externally, then call run_turn with Continue
788    ///     }
789    ///     _ => {}
790    /// }
791    /// ```
792    pub async fn run_turn(
793        &self,
794        thread_id: ThreadId,
795        input: AgentInput,
796        tool_context: ToolContext<Ctx>,
797        cancel_token: CancellationToken,
798        options: TurnOptions,
799    ) -> crate::types::TurnOutcome
800    where
801        Ctx: Clone,
802    {
803        self.run_turn_with_options(
804            thread_id,
805            input,
806            tool_context,
807            cancel_token,
808            options,
809            RunOptions::default(),
810        )
811        .await
812    }
813
814    /// Like [`run_turn`](Self::run_turn), but with caller-supplied
815    /// trace metadata.
816    ///
817    /// See [`run_with_options`](Self::run_with_options) for the full
818    /// [`RunOptions`] contract. The `turn_options` parameter retains
819    /// its existing semantics (tool runtime / strict durability);
820    /// `run_options` is layered on top to populate Langfuse trace
821    /// metadata on the root `invoke_agent` span.
822    pub async fn run_turn_with_options(
823        &self,
824        thread_id: ThreadId,
825        input: AgentInput,
826        tool_context: ToolContext<Ctx>,
827        cancel_token: CancellationToken,
828        turn_options: TurnOptions,
829        run_options: RunOptions,
830    ) -> crate::types::TurnOutcome
831    where
832        Ctx: Clone,
833    {
834        // See `run_abortable_with_options` for why we explicitly
835        // drop `run_options` on non-otel builds.
836        #[cfg(not(feature = "otel"))]
837        drop(run_options);
838
839        let authority = self.resolve_authority();
840
841        run_single_turn(TurnParameters {
842            event_store: Arc::clone(&self.event_store),
843            authority,
844            thread_id,
845            input,
846            tool_context,
847            provider: Arc::clone(&self.provider),
848            tools: Arc::clone(&self.tools),
849            hooks: Arc::clone(&self.hooks),
850            message_store: Arc::clone(&self.message_store),
851            state_store: Arc::clone(&self.state_store),
852            config: self.config.clone(),
853            compaction_config: self.compaction_config.clone(),
854            compactor: self.compactor.clone(),
855            execution_store: self.execution_store.clone(),
856            audit_sink: Arc::clone(&self.audit_sink),
857            cancel_token,
858            turn_options,
859            #[cfg(feature = "otel")]
860            run_options,
861            #[cfg(feature = "otel")]
862            observability_store: self.observability_store.clone(),
863        })
864        .await
865    }
866}
867
868/// High-level convenience API for agents whose tools take no application
869/// context (`Ctx = ()`).
870///
871/// [`ask`](Self::ask) and [`send`](Self::send) collapse the four pieces of
872/// ceremony in the low-level [`run`](Self::run) path — constructing a
873/// [`ToolContext::new(())`](crate::ToolContext::new), creating a
874/// [`CancellationToken`], awaiting the run, and reassembling the assistant
875/// text out of the event store — into a single call that returns a `String`.
876///
877/// Reach for [`run`](Self::run) or [`run_turn`](Self::run_turn) when you need
878/// application context, cancellation, confirmation flow, or access to the raw
879/// [`AgentRunState`].
880impl<P, H, M, S> AgentLoop<(), P, H, M, S>
881where
882    P: LlmProvider + 'static,
883    H: AgentHooks + 'static,
884    M: MessageStore + 'static,
885    S: StateStore + 'static,
886{
887    /// Ask the agent a question and return its assembled reply.
888    ///
889    /// This is the 30-second on-ramp: it builds a fresh
890    /// [`ToolContext::new(())`](crate::ToolContext::new) and a
891    /// [`CancellationToken`] internally, runs the agent to completion, and
892    /// returns the assistant text emitted during this call concatenated into
893    /// one `String`.
894    ///
895    /// For confirmation flows, application context, or explicit cancellation,
896    /// use [`run`](Self::run) directly.
897    ///
898    /// # Errors
899    ///
900    /// Returns an error if the run task is dropped before reporting a state,
901    /// if the run ends in [`AgentRunState::Error`], or if the event store
902    /// cannot be read back.
903    pub async fn ask(
904        &self,
905        thread_id: ThreadId,
906        text: impl Into<String>,
907    ) -> anyhow::Result<String> {
908        self.send(thread_id, AgentInput::Text(text.into())).await
909    }
910
911    /// Send an [`AgentInput`] to the agent and return its assembled reply.
912    ///
913    /// Like [`ask`](Self::ask) but accepts a full [`AgentInput`] (e.g. to
914    /// resume after confirmation). Builds the
915    /// [`ToolContext`](crate::ToolContext) and [`CancellationToken`]
916    /// internally and returns the assistant text emitted during this call.
917    ///
918    /// # Errors
919    ///
920    /// Returns an error if the run task is dropped before reporting a state,
921    /// if the run ends in [`AgentRunState::Error`], or if the event store
922    /// cannot be read back.
923    pub async fn send(&self, thread_id: ThreadId, input: AgentInput) -> anyhow::Result<String> {
924        use crate::events::AgentEvent;
925
926        // Snapshot the existing event count so we only assemble text emitted
927        // by this call, not earlier turns persisted on the same thread.
928        let baseline = self.event_store.get_events(&thread_id).await?.len();
929
930        let state = self
931            .run(
932                thread_id.clone(),
933                input,
934                ToolContext::new(()),
935                CancellationToken::new(),
936            )
937            .await?;
938
939        if let AgentRunState::Error(error) = state {
940            return Err(anyhow::Error::new(error));
941        }
942
943        let events = self.event_store.get_events(&thread_id).await?;
944        let reply = events
945            .into_iter()
946            .skip(baseline)
947            .filter_map(|envelope| match envelope.event {
948                AgentEvent::Text { text, .. } => Some(text),
949                _ => None,
950            })
951            .collect::<String>();
952
953        Ok(reply)
954    }
955}