agent_sdk/agent_loop.rs
1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Persists events throughout to the configured event store
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//! .provider(AnthropicProvider::sonnet(api_key))
26//! .tools(my_tools)
27//! .event_store(event_store)
28//! .build();
29//! ```
30
31mod builder;
32mod helpers;
33mod idempotency;
34mod listen;
35mod llm;
36mod run_loop;
37#[cfg(test)]
38mod test_utils;
39#[cfg(test)]
40mod tests;
41mod tool_execution;
42mod turn;
43mod types;
44
45use self::run_loop::{run_loop, run_single_turn};
46use self::types::{RunLoopParameters, TurnParameters};
47use crate::types::TurnOptions;
48
49pub use self::builder::AgentLoopBuilder;
50
51use crate::authority::{EventAuthority, LocalEventAuthority};
52use crate::context::{CompactionConfig, ContextCompactor};
53use crate::hooks::AgentHooks;
54use crate::llm::LlmProvider;
55use crate::stores::{EventStore, MessageStore, StateStore, ToolExecutionStore};
56use crate::tools::{ToolContext, ToolRegistry};
57use crate::types::{AgentConfig, AgentError, AgentInput, AgentRunState, RunOptions, ThreadId};
58use futures::FutureExt;
59use std::future::Future;
60use std::panic::AssertUnwindSafe;
61use std::sync::Arc;
62use tokio::sync::{mpsc, oneshot};
63use tokio_util::sync::CancellationToken;
64
65/// Run the agent loop with panic isolation at the spawned-task boundary.
66///
67/// A panic anywhere inside the run loop — most importantly inside the
68/// LLM provider call or the compaction provider call, neither of which
69/// is otherwise guarded — would unwind the spawned task, drop the
70/// `state_tx` oneshot, and surface to the caller as an opaque
71/// [`oneshot::error::RecvError`] (and, for subagents, be misclassified
72/// as `Disconnected`). Catching the unwind here turns it into a
73/// structured [`AgentRunState::Error`] that the task can still send on
74/// `state_tx`, so the run ends observably rather than silently tearing
75/// down the channel.
76///
77/// `AssertUnwindSafe` is sound at this boundary: the run loop owns its
78/// `RunLoopParameters` by value, so nothing it touches outlives the
79/// caught panic and is observed afterwards. The only value produced is
80/// the returned `AgentRunState`. Tool-level panics are already caught
81/// closer to the tool boundary (see
82/// `agent_loop::helpers::catch_tool_panic`) so the assistant
83/// `tool_use` / `tool_result` history stays balanced; this guard is the
84/// outer safety net for everything else.
85async fn run_loop_isolated<Ctx, P, H, M, S>(
86 params: RunLoopParameters<Ctx, P, H, M, S>,
87) -> AgentRunState
88where
89 Ctx: Send + Sync + Clone + 'static,
90 P: LlmProvider,
91 H: AgentHooks,
92 M: MessageStore,
93 S: StateStore,
94{
95 match AssertUnwindSafe(run_loop(params)).catch_unwind().await {
96 Ok(state) => state,
97 Err(payload) => {
98 let message = self::helpers::panic_payload_message(payload.as_ref());
99 log::error!("agent run loop panicked: {message}");
100 AgentRunState::Error(AgentError::new(
101 format!("Agent run panicked: {message}"),
102 false,
103 ))
104 }
105 }
106}
107
108/// Drop a spawned run task's [`tokio::task::JoinHandle`], logging a
109/// `debug!` to make the detach visible.
110///
111/// `run` / `run_with_options` intentionally drop the handle: the run is
112/// stopped through the cancel token or the per-tool timeout, not by
113/// aborting the task. Surfacing the detach at `debug` level gives a
114/// breadcrumb when a subprocess-backed tool that ignores the
115/// cooperative-cancel contract leaks a process after cancellation.
116fn warn_on_detached_run_handle(handle: tokio::task::JoinHandle<()>) {
117 log::debug!(
118 "agent run JoinHandle dropped (task detached); the run can only be \
119 stopped via its cancel token or per-tool timeout. Subprocess-backed \
120 tools must honour kill_on_drop or a token-aware kill to avoid leaks"
121 );
122 drop(handle);
123}
124
125/// Await a run's state receiver, mapping a dropped channel to an `anyhow`
126/// error so `run`/`run_with_options` can present an `impl Future` instead of
127/// a bare [`oneshot::Receiver`].
128///
129/// A panic inside the run is already converted to
130/// [`AgentRunState::Error`] before the channel send (see
131/// [`run_loop_isolated`]), so the only way `recv` fails is the sender being
132/// dropped without sending — a runtime shutdown rather than an agent error.
133async fn recv_run_state(
134 state_rx: oneshot::Receiver<AgentRunState>,
135) -> anyhow::Result<AgentRunState> {
136 state_rx
137 .await
138 .map_err(|_| anyhow::anyhow!("agent run task was dropped before reporting a final state"))
139}
140
141/// Handle to a persistent agent thread.
142///
143/// Returned by [`AgentLoop::run_persistent`]. Allows the caller to send
144/// new messages to the running agent and cancel execution.
145pub struct AgentHandle {
146 /// Send new messages to the running agent. The agent will process
147 /// them as new user turns after completing the current turn.
148 pub input_tx: mpsc::Sender<AgentInput>,
149 /// Final run state (sent once when the agent completes).
150 pub state_rx: oneshot::Receiver<AgentRunState>,
151 /// Cancel the running agent.
152 pub cancel_token: CancellationToken,
153}
154
155/// Configuration bundle for constructing an [`AgentLoop`] with compaction.
156pub struct AgentLoopCompactionConfig {
157 pub agent_config: AgentConfig,
158 pub compaction_config: CompactionConfig,
159}
160
161impl AgentLoopCompactionConfig {
162 #[must_use]
163 pub const fn new(agent_config: AgentConfig, compaction_config: CompactionConfig) -> Self {
164 Self {
165 agent_config,
166 compaction_config,
167 }
168 }
169}
170
171/// The main agent loop that orchestrates LLM calls and tool execution.
172///
173/// `AgentLoop` is the core component that:
174/// - Manages conversation state via message and state stores
175/// - Calls the LLM provider and processes responses
176/// - Executes tools through the tool registry
177/// - Persists events to the configured event store
178/// - Enforces hooks for tool permissions and lifecycle events
179///
180/// # Type Parameters
181///
182/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
183/// - `P`: The LLM provider implementation
184/// - `H`: The hooks implementation for lifecycle customization
185/// - `M`: The message store implementation
186/// - `S`: The state store implementation
187///
188/// # Event Storage
189///
190/// Every loop instance requires an [`EventStore`] configured at construction
191/// time. Events are written to that store for the entire lifecycle of the loop,
192/// and callers read them back from the store instead of receiving an in-process
193/// channel from the runtime.
194///
195/// # Running the Agent
196///
197/// ```ignore
198/// let final_state = agent.run(
199/// thread_id,
200/// AgentInput::Text("Hello!".to_string()),
201/// tool_ctx,
202/// );
203/// let state = final_state.await?;
204/// let events = event_store.get_events(&thread_id).await?;
205/// ```
206pub struct AgentLoop<Ctx, P, H, M, S>
207where
208 P: LlmProvider,
209 H: AgentHooks,
210 M: MessageStore,
211 S: StateStore,
212{
213 pub(super) provider: Arc<P>,
214 pub(super) tools: Arc<ToolRegistry<Ctx>>,
215 pub(super) hooks: Arc<H>,
216 pub(super) message_store: Arc<M>,
217 pub(super) state_store: Arc<S>,
218 pub(super) event_store: Arc<dyn EventStore>,
219 pub(super) event_authority: Option<Arc<dyn EventAuthority>>,
220 pub(super) config: AgentConfig,
221 pub(super) compaction_config: Option<CompactionConfig>,
222 pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
223 pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
224 pub(super) audit_sink: Arc<dyn crate::hooks::ToolAuditSink>,
225 #[cfg(feature = "otel")]
226 pub(super) observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
227}
228
229/// Create a new builder for constructing an `AgentLoop`.
230#[must_use]
231pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
232 AgentLoopBuilder::new()
233}
234
235impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
236where
237 Ctx: Send + Sync + 'static,
238 P: LlmProvider + 'static,
239 H: AgentHooks + 'static,
240 M: MessageStore + 'static,
241 S: StateStore + 'static,
242{
243 /// Create a new agent loop with all components specified directly.
244 #[must_use]
245 pub fn new(
246 provider: P,
247 tools: ToolRegistry<Ctx>,
248 hooks: H,
249 message_store: M,
250 state_store: S,
251 event_store: Arc<dyn EventStore>,
252 config: AgentConfig,
253 ) -> Self {
254 Self {
255 provider: Arc::new(provider),
256 tools: Arc::new(tools),
257 hooks: Arc::new(hooks),
258 message_store: Arc::new(message_store),
259 state_store: Arc::new(state_store),
260 event_store,
261 event_authority: None,
262 config,
263 compaction_config: None,
264 compactor: None,
265 execution_store: None,
266 audit_sink: Arc::new(crate::hooks::NoopAuditSink),
267 #[cfg(feature = "otel")]
268 observability_store: None,
269 }
270 }
271
272 /// Create a new agent loop with compaction enabled.
273 #[must_use]
274 pub fn with_compaction(
275 provider: P,
276 tools: ToolRegistry<Ctx>,
277 hooks: H,
278 message_store: M,
279 state_store: S,
280 event_store: Arc<dyn EventStore>,
281 config: AgentLoopCompactionConfig,
282 ) -> Self {
283 let AgentLoopCompactionConfig {
284 agent_config,
285 compaction_config,
286 } = config;
287 Self {
288 provider: Arc::new(provider),
289 tools: Arc::new(tools),
290 hooks: Arc::new(hooks),
291 message_store: Arc::new(message_store),
292 state_store: Arc::new(state_store),
293 event_store,
294 event_authority: None,
295 config: agent_config,
296 compaction_config: Some(compaction_config),
297 compactor: None,
298 execution_store: None,
299 audit_sink: Arc::new(crate::hooks::NoopAuditSink),
300 #[cfg(feature = "otel")]
301 observability_store: None,
302 }
303 }
304
305 /// Set the authoritative tool audit sink.
306 ///
307 /// When set, the loop emits a [`ToolAuditRecord`](crate::advanced::ToolAuditRecord)
308 /// at every tool-lifecycle transition (blocked, requires-confirmation,
309 /// cached, replayed, invalidated, completed, persistence-failed).
310 ///
311 /// The default is [`NoopAuditSink`](crate::hooks::NoopAuditSink) which
312 /// discards every record — suitable for local/CLI usage. Servers should
313 /// swap in a durable sink.
314 #[must_use]
315 pub fn with_audit_sink(mut self, sink: impl crate::hooks::ToolAuditSink + 'static) -> Self {
316 self.audit_sink = Arc::new(sink);
317 self
318 }
319
320 /// Set the observability store for `GenAI` payload capture.
321 ///
322 /// When set, the store is called at each LLM request boundary to decide
323 /// whether payloads are inlined on spans, externalized, or omitted.
324 #[cfg(feature = "otel")]
325 #[must_use]
326 pub fn with_observability_store(
327 mut self,
328 store: impl crate::observability::ObservabilityStore + 'static,
329 ) -> Self {
330 self.observability_store = Some(Arc::new(store));
331 self
332 }
333
334 /// Resolve the event authority for this run.
335 ///
336 /// If an external authority was configured via the builder, use it.
337 /// Otherwise create a fresh [`LocalEventAuthority`] that starts at 0
338 /// (the pre-existing local/CLI behaviour).
339 fn resolve_authority(&self) -> Arc<dyn EventAuthority> {
340 self.event_authority
341 .clone()
342 .unwrap_or_else(|| Arc::new(LocalEventAuthority::new()))
343 }
344
345 /// Run the agent loop.
346 ///
347 /// This method allows the agent to pause when a tool requires confirmation,
348 /// returning an `AgentRunState::AwaitingConfirmation` that contains the
349 /// state needed to resume.
350 ///
351 /// When the `cancel_token` is cancelled, the agent will stop after the
352 /// current turn completes (no new turns will start). The final state will
353 /// be `AgentRunState::Cancelled`.
354 ///
355 /// # Arguments
356 ///
357 /// * `thread_id` - The thread identifier for this conversation
358 /// * `input` - Either a new text message or a resume with confirmation decision
359 /// * `tool_context` - Context passed to tools
360 /// * `cancel_token` - Token to signal cancellation from outside
361 ///
362 /// # Returns
363 ///
364 /// A future that resolves to the final [`AgentRunState`]. Awaiting it
365 /// drives the run to completion:
366 ///
367 /// ```ignore
368 /// let final_state = agent.run(thread_id, input, tool_ctx, cancel).await?;
369 /// ```
370 ///
371 /// The future is `'static` — the run is already spawned on a Tokio task
372 /// before this returns, so dropping the future does **not** stop the run
373 /// (use `cancel_token`). Awaiting it only waits for the result.
374 ///
375 /// # Example
376 ///
377 /// ```ignore
378 /// let cancel = CancellationToken::new();
379 /// let final_state = agent.run(
380 /// thread_id,
381 /// AgentInput::Text("Hello".to_string()),
382 /// tool_ctx,
383 /// cancel.clone(),
384 /// ).await?;
385 ///
386 /// match final_state {
387 /// AgentRunState::Done { .. } => { /* completed */ }
388 /// AgentRunState::Cancelled { .. } => { /* user cancelled */ }
389 /// AgentRunState::AwaitingConfirmation { continuation, .. } => {
390 /// // Get user decision, then resume:
391 /// let state2 = agent.run(
392 /// thread_id,
393 /// AgentInput::Resume {
394 /// continuation,
395 /// tool_call_id: id,
396 /// confirmed: true,
397 /// rejection_reason: None,
398 /// },
399 /// tool_ctx,
400 /// cancel.clone(),
401 /// ).await?;
402 /// }
403 /// AgentRunState::Error(e) => { /* handle error */ }
404 /// }
405 /// ```
406 /// # Cancellation, timeout, and the dropped `JoinHandle`
407 ///
408 /// `run` spawns the agent loop on a Tokio task and returns a future over
409 /// the state channel — it **drops** the task's
410 /// [`tokio::task::JoinHandle`]. Dropping a `JoinHandle` *detaches* the
411 /// task rather than aborting it, so the only ways to stop an in-flight
412 /// run are the `cancel_token` (cooperative) or the per-tool
413 /// [`AgentConfig::tool_timeout_ms`](crate::types::AgentConfig::tool_timeout_ms)
414 /// boundary. Callers that need to forcibly abort must use
415 /// [`run_abortable`](Self::run_abortable) and keep the handle.
416 ///
417 /// Because the handle is dropped, a tool that holds a subprocess open
418 /// must obey the
419 /// [cooperative-cancel contract](crate::tools::Tool#cooperative-cancellation)
420 /// (`kill_on_drop` or a token-aware `kill`) or the subprocess will
421 /// outlive the cancelled / timed-out run. A `debug!` is logged here so
422 /// the detach is visible when chasing a leaked subprocess.
423 ///
424 /// # Errors
425 ///
426 /// Returns an error only if the spawned run task is dropped before it can
427 /// report a final state (e.g. a runtime shutdown). A panic inside the run
428 /// is caught and surfaced as [`AgentRunState::Error`], not as an `Err`.
429 pub fn run(
430 &self,
431 thread_id: ThreadId,
432 input: AgentInput,
433 tool_context: ToolContext<Ctx>,
434 cancel_token: CancellationToken,
435 ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
436 where
437 Ctx: Clone,
438 {
439 let (state_rx, handle) = self.run_abortable(thread_id, input, tool_context, cancel_token);
440 warn_on_detached_run_handle(handle);
441 recv_run_state(state_rx)
442 }
443
444 /// Like [`run`](Self::run), but with caller-supplied trace metadata.
445 ///
446 /// Equivalent to `run` except that the supplied [`RunOptions`]
447 /// configure session/user IDs (propagated as `session.id` /
448 /// `user.id` baggage), `langfuse.trace.{name,tags,metadata.*,
449 /// input,output}`, `langfuse.{release,environment}`, and the
450 /// trace-text truncation ceiling.
451 ///
452 /// Use this instead of `run` whenever the consumer needs the
453 /// SDK to populate Langfuse trace metadata; `run` itself
454 /// continues to delegate here with `RunOptions::default()`.
455 ///
456 /// # Errors
457 ///
458 /// See [`run`](Self::run).
459 pub fn run_with_options(
460 &self,
461 thread_id: ThreadId,
462 input: AgentInput,
463 tool_context: ToolContext<Ctx>,
464 cancel_token: CancellationToken,
465 run_options: RunOptions,
466 ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
467 where
468 Ctx: Clone,
469 {
470 let (state_rx, handle) = self.run_abortable_with_options(
471 thread_id,
472 input,
473 tool_context,
474 cancel_token,
475 run_options,
476 );
477 warn_on_detached_run_handle(handle);
478 recv_run_state(state_rx)
479 }
480
481 /// Like [`run`](Self::run), but also returns the [`tokio::task::JoinHandle`] for the
482 /// spawned task.
483 ///
484 /// Callers that need to forcibly abort the agent loop (e.g. subagent
485 /// timeout) can call [`tokio::task::JoinHandle::abort`] on the returned handle.
486 /// Aborting the handle drops the in-flight LLM stream immediately
487 /// instead of waiting for the current turn to finish.
488 pub fn run_abortable(
489 &self,
490 thread_id: ThreadId,
491 input: AgentInput,
492 tool_context: ToolContext<Ctx>,
493 cancel_token: CancellationToken,
494 ) -> (
495 oneshot::Receiver<AgentRunState>,
496 tokio::task::JoinHandle<()>,
497 )
498 where
499 Ctx: Clone,
500 {
501 self.run_abortable_with_options(
502 thread_id,
503 input,
504 tool_context,
505 cancel_token,
506 RunOptions::default(),
507 )
508 }
509
510 /// Like [`run_abortable`](Self::run_abortable), but with
511 /// caller-supplied trace metadata. See [`run_with_options`](Self::run_with_options).
512 pub fn run_abortable_with_options(
513 &self,
514 thread_id: ThreadId,
515 input: AgentInput,
516 tool_context: ToolContext<Ctx>,
517 cancel_token: CancellationToken,
518 run_options: RunOptions,
519 ) -> (
520 oneshot::Receiver<AgentRunState>,
521 tokio::task::JoinHandle<()>,
522 )
523 where
524 Ctx: Clone,
525 {
526 // `run_options` only feeds OTel root-span metadata. On
527 // non-otel builds the value is genuinely not needed —
528 // explicitly drop it so the unused-variable / needless-pass
529 // lints stay quiet without us reaching for an
530 // `#[allow(...)]`.
531 #[cfg(not(feature = "otel"))]
532 drop(run_options);
533
534 let (state_tx, state_rx) = oneshot::channel();
535 let authority = self.resolve_authority();
536
537 let provider = Arc::clone(&self.provider);
538 let tools = Arc::clone(&self.tools);
539 let hooks = Arc::clone(&self.hooks);
540 let message_store = Arc::clone(&self.message_store);
541 let state_store = Arc::clone(&self.state_store);
542 let event_store = Arc::clone(&self.event_store);
543 let config = self.config.clone();
544 let compaction_config = self.compaction_config.clone();
545 let compactor = self.compactor.clone();
546 let execution_store = self.execution_store.clone();
547 let audit_sink = Arc::clone(&self.audit_sink);
548 #[cfg(feature = "otel")]
549 let observability_store = self.observability_store.clone();
550 #[cfg(feature = "otel")]
551 let parent_cx = crate::observability::context::capture_context();
552
553 let task = async move {
554 let result = run_loop_isolated(RunLoopParameters {
555 event_store,
556 authority,
557 thread_id,
558 input,
559 tool_context,
560 provider,
561 tools,
562 hooks,
563 message_store,
564 state_store,
565 config,
566 compaction_config,
567 compactor,
568 execution_store,
569 audit_sink,
570 cancel_token,
571 input_rx: None,
572 #[cfg(feature = "otel")]
573 run_options,
574 #[cfg(feature = "otel")]
575 observability_store,
576 })
577 .await;
578
579 let _ = state_tx.send(result);
580 };
581
582 #[cfg(feature = "otel")]
583 let task = {
584 use opentelemetry::trace::FutureExt;
585 task.with_context(parent_cx)
586 };
587
588 let handle = tokio::spawn(task);
589
590 (state_rx, handle)
591 }
592
593 /// Run the agent with a persistent input channel.
594 ///
595 /// Unlike [`Self::run`], this returns an [`AgentHandle`] that allows the caller
596 /// to inject new user messages into the running agent via `input_tx`.
597 /// The agent will process the initial input, then wait for new messages
598 /// on the channel between turns instead of exiting on `Done`.
599 ///
600 /// The agent exits when:
601 /// - The `input_tx` sender is dropped (no more messages)
602 /// - The `cancel_token` is cancelled
603 /// - Max turns exceeded
604 pub fn run_persistent(
605 &self,
606 thread_id: ThreadId,
607 input: AgentInput,
608 tool_context: ToolContext<Ctx>,
609 cancel_token: CancellationToken,
610 ) -> AgentHandle
611 where
612 Ctx: Clone,
613 {
614 self.run_persistent_with_options(
615 thread_id,
616 input,
617 tool_context,
618 cancel_token,
619 RunOptions::default(),
620 )
621 }
622
623 /// Like [`run_persistent`](Self::run_persistent), but with
624 /// caller-supplied trace metadata. See
625 /// [`run_with_options`](Self::run_with_options).
626 pub fn run_persistent_with_options(
627 &self,
628 thread_id: ThreadId,
629 input: AgentInput,
630 tool_context: ToolContext<Ctx>,
631 cancel_token: CancellationToken,
632 run_options: RunOptions,
633 ) -> AgentHandle
634 where
635 Ctx: Clone,
636 {
637 // See `run_abortable_with_options` for why we explicitly
638 // drop `run_options` on non-otel builds.
639 #[cfg(not(feature = "otel"))]
640 drop(run_options);
641
642 let (state_tx, state_rx) = oneshot::channel();
643 let (input_tx, input_rx) = mpsc::channel(32);
644 let authority = self.resolve_authority();
645
646 let provider = Arc::clone(&self.provider);
647 let tools = Arc::clone(&self.tools);
648 let hooks = Arc::clone(&self.hooks);
649 let message_store = Arc::clone(&self.message_store);
650 let state_store = Arc::clone(&self.state_store);
651 let event_store = Arc::clone(&self.event_store);
652 let config = self.config.clone();
653 let compaction_config = self.compaction_config.clone();
654 let compactor = self.compactor.clone();
655 let execution_store = self.execution_store.clone();
656 let audit_sink = Arc::clone(&self.audit_sink);
657 #[cfg(feature = "otel")]
658 let observability_store = self.observability_store.clone();
659 let cancel_handle = cancel_token.clone();
660 #[cfg(feature = "otel")]
661 let parent_cx = crate::observability::context::capture_context();
662
663 let task = async move {
664 let result = run_loop_isolated(RunLoopParameters {
665 event_store,
666 authority,
667 thread_id,
668 input,
669 tool_context,
670 provider,
671 tools,
672 hooks,
673 message_store,
674 state_store,
675 config,
676 compaction_config,
677 compactor,
678 execution_store,
679 audit_sink,
680 cancel_token,
681 input_rx: Some(input_rx),
682 #[cfg(feature = "otel")]
683 run_options,
684 #[cfg(feature = "otel")]
685 observability_store,
686 })
687 .await;
688
689 let _ = state_tx.send(result);
690 };
691
692 #[cfg(feature = "otel")]
693 let task = {
694 use opentelemetry::trace::FutureExt;
695 task.with_context(parent_cx)
696 };
697
698 tokio::spawn(task);
699
700 AgentHandle {
701 input_tx,
702 state_rx,
703 cancel_token: cancel_handle,
704 }
705 }
706
707 /// Run a single turn of the agent loop — the authoritative server boundary.
708 ///
709 /// Unlike `run()`, this method executes exactly one turn **directly in the
710 /// caller's task** (no `tokio::spawn`) and returns the result inline. This
711 /// enables external orchestration where each turn can be dispatched as a
712 /// separate message (e.g., via Artemis or another message queue).
713 ///
714 /// When the `cancel_token` is cancelled, the turn will be aborted before
715 /// starting execution and return `TurnOutcome::Cancelled`.
716 ///
717 /// # Arguments
718 ///
719 /// * `thread_id` - The thread identifier for this conversation
720 /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
721 /// * `tool_context` - Context passed to tools
722 /// * `cancel_token` - Token to signal cancellation from outside
723 /// * `options` - Execution options (tool runtime strategy, durability)
724 ///
725 /// # Returns
726 ///
727 /// A [`crate::types::TurnOutcome`] returned only after the configured event store's
728 /// `finish_turn(thread_id, turn)` barrier has completed.
729 ///
730 /// Every variant except [`crate::types::TurnOutcome::Error`] carries a
731 /// structured [`crate::types::TurnSummary`] in the `summary` field. This
732 /// summary is the **authoritative** server-facing outcome contract —
733 /// it contains the provider/model provenance, response ID, stop reason,
734 /// tool-call count, duration, and execution options for the turn. Server
735 /// code should read from `summary` rather than the legacy per-variant
736 /// fields (`total_turns`, `input_tokens`, `output_tokens`, …), which are
737 /// retained only for backwards compatibility with local callers.
738 ///
739 /// # Turn Outcomes
740 ///
741 /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
742 /// - `Done` - Agent completed successfully
743 /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
744 /// - `PendingToolCalls` - Tools need external execution (only with `ToolRuntime::External`)
745 /// - `Cancelled` - Turn was cancelled via the token
746 /// - `Error` - An error occurred (no summary attached)
747 ///
748 /// # Example
749 ///
750 /// ```ignore
751 /// use std::sync::Arc;
752 /// use agent_sdk::{InMemoryEventStore, TurnOptions};
753 ///
754 /// let cancel = CancellationToken::new();
755 /// let event_store = Arc::new(InMemoryEventStore::new());
756 /// let outcome = agent.run_turn(
757 /// thread_id.clone(),
758 /// AgentInput::Text("What is 2+2?".to_string()),
759 /// tool_ctx.clone(),
760 /// cancel,
761 /// TurnOptions::default(),
762 /// ).await;
763 ///
764 /// let events = event_store.get_events(&thread_id).await?;
765 ///
766 /// // Read server-facing metadata from the TurnSummary.
767 /// if let Some(summary) = outcome.summary() {
768 /// println!(
769 /// "turn={} provider={} model={} stop={:?} response_id={:?}",
770 /// summary.turn,
771 /// summary.provenance.provider,
772 /// summary.provenance.model,
773 /// summary.stop_reason,
774 /// summary.response_id,
775 /// );
776 /// }
777 ///
778 /// // Branch on the variant for flow control.
779 /// match outcome {
780 /// TurnOutcome::NeedsMoreTurns { turn, .. } => {
781 /// // Dispatch another message to continue
782 /// }
783 /// TurnOutcome::Done { .. } => {
784 /// // Conversation complete
785 /// }
786 /// TurnOutcome::PendingToolCalls { tool_calls, .. } => {
787 /// // Execute tools externally, then call run_turn with Continue
788 /// }
789 /// _ => {}
790 /// }
791 /// ```
792 pub async fn run_turn(
793 &self,
794 thread_id: ThreadId,
795 input: AgentInput,
796 tool_context: ToolContext<Ctx>,
797 cancel_token: CancellationToken,
798 options: TurnOptions,
799 ) -> crate::types::TurnOutcome
800 where
801 Ctx: Clone,
802 {
803 self.run_turn_with_options(
804 thread_id,
805 input,
806 tool_context,
807 cancel_token,
808 options,
809 RunOptions::default(),
810 )
811 .await
812 }
813
814 /// Like [`run_turn`](Self::run_turn), but with caller-supplied
815 /// trace metadata.
816 ///
817 /// See [`run_with_options`](Self::run_with_options) for the full
818 /// [`RunOptions`] contract. The `turn_options` parameter retains
819 /// its existing semantics (tool runtime / strict durability);
820 /// `run_options` is layered on top to populate Langfuse trace
821 /// metadata on the root `invoke_agent` span.
822 pub async fn run_turn_with_options(
823 &self,
824 thread_id: ThreadId,
825 input: AgentInput,
826 tool_context: ToolContext<Ctx>,
827 cancel_token: CancellationToken,
828 turn_options: TurnOptions,
829 run_options: RunOptions,
830 ) -> crate::types::TurnOutcome
831 where
832 Ctx: Clone,
833 {
834 // See `run_abortable_with_options` for why we explicitly
835 // drop `run_options` on non-otel builds.
836 #[cfg(not(feature = "otel"))]
837 drop(run_options);
838
839 let authority = self.resolve_authority();
840
841 run_single_turn(TurnParameters {
842 event_store: Arc::clone(&self.event_store),
843 authority,
844 thread_id,
845 input,
846 tool_context,
847 provider: Arc::clone(&self.provider),
848 tools: Arc::clone(&self.tools),
849 hooks: Arc::clone(&self.hooks),
850 message_store: Arc::clone(&self.message_store),
851 state_store: Arc::clone(&self.state_store),
852 config: self.config.clone(),
853 compaction_config: self.compaction_config.clone(),
854 compactor: self.compactor.clone(),
855 execution_store: self.execution_store.clone(),
856 audit_sink: Arc::clone(&self.audit_sink),
857 cancel_token,
858 turn_options,
859 #[cfg(feature = "otel")]
860 run_options,
861 #[cfg(feature = "otel")]
862 observability_store: self.observability_store.clone(),
863 })
864 .await
865 }
866}
867
868/// High-level convenience API for agents whose tools take no application
869/// context (`Ctx = ()`).
870///
871/// [`ask`](Self::ask) and [`send`](Self::send) collapse the four pieces of
872/// ceremony in the low-level [`run`](Self::run) path — constructing a
873/// [`ToolContext::new(())`](crate::ToolContext::new), creating a
874/// [`CancellationToken`], awaiting the run, and reassembling the assistant
875/// text out of the event store — into a single call that returns a `String`.
876///
877/// Reach for [`run`](Self::run) or [`run_turn`](Self::run_turn) when you need
878/// application context, cancellation, confirmation flow, or access to the raw
879/// [`AgentRunState`].
880impl<P, H, M, S> AgentLoop<(), P, H, M, S>
881where
882 P: LlmProvider + 'static,
883 H: AgentHooks + 'static,
884 M: MessageStore + 'static,
885 S: StateStore + 'static,
886{
887 /// Ask the agent a question and return its assembled reply.
888 ///
889 /// This is the 30-second on-ramp: it builds a fresh
890 /// [`ToolContext::new(())`](crate::ToolContext::new) and a
891 /// [`CancellationToken`] internally, runs the agent to completion, and
892 /// returns the assistant text emitted during this call concatenated into
893 /// one `String`.
894 ///
895 /// For confirmation flows, application context, or explicit cancellation,
896 /// use [`run`](Self::run) directly.
897 ///
898 /// # Errors
899 ///
900 /// Returns an error if the run task is dropped before reporting a state,
901 /// if the run ends in [`AgentRunState::Error`], or if the event store
902 /// cannot be read back.
903 pub async fn ask(
904 &self,
905 thread_id: ThreadId,
906 text: impl Into<String>,
907 ) -> anyhow::Result<String> {
908 self.send(thread_id, AgentInput::Text(text.into())).await
909 }
910
911 /// Send an [`AgentInput`] to the agent and return its assembled reply.
912 ///
913 /// Like [`ask`](Self::ask) but accepts a full [`AgentInput`] (e.g. to
914 /// resume after confirmation). Builds the
915 /// [`ToolContext`](crate::ToolContext) and [`CancellationToken`]
916 /// internally and returns the assistant text emitted during this call.
917 ///
918 /// # Errors
919 ///
920 /// Returns an error if the run task is dropped before reporting a state,
921 /// if the run ends in [`AgentRunState::Error`], or if the event store
922 /// cannot be read back.
923 pub async fn send(&self, thread_id: ThreadId, input: AgentInput) -> anyhow::Result<String> {
924 use crate::events::AgentEvent;
925
926 // Snapshot the existing event count so we only assemble text emitted
927 // by this call, not earlier turns persisted on the same thread.
928 let baseline = self.event_store.get_events(&thread_id).await?.len();
929
930 let state = self
931 .run(
932 thread_id.clone(),
933 input,
934 ToolContext::new(()),
935 CancellationToken::new(),
936 )
937 .await?;
938
939 if let AgentRunState::Error(error) = state {
940 return Err(anyhow::Error::new(error));
941 }
942
943 let events = self.event_store.get_events(&thread_id).await?;
944 let reply = events
945 .into_iter()
946 .skip(baseline)
947 .filter_map(|envelope| match envelope.event {
948 AgentEvent::Text { text, .. } => Some(text),
949 _ => None,
950 })
951 .collect::<String>();
952
953 Ok(reply)
954 }
955}