agent_sdk/agent_loop.rs
1//! Agent loop orchestration module.
2//!
3//! This module contains the core agent loop that orchestrates LLM calls,
4//! tool execution, and event handling. The agent loop is the main entry point
5//! for running an AI agent.
6//!
7//! # Architecture
8//!
9//! The agent loop works as follows:
10//! 1. Receives a user message
11//! 2. Sends the message to the LLM provider
12//! 3. Processes the LLM response (text or tool calls)
13//! 4. If tool calls are present, executes them and feeds results back to LLM
14//! 5. Repeats until the LLM responds with only text (no tool calls)
15//! 6. Persists events throughout to the configured event store
16//!
17//! # Building an Agent
18//!
19//! Use the builder pattern via [`builder()`] or [`AgentLoopBuilder`]:
20//!
21//! ```ignore
22//! use agent_sdk::{builder, providers::AnthropicProvider};
23//!
24//! let agent = builder()
25//! .provider(AnthropicProvider::sonnet(api_key))
26//! .tools(my_tools)
27//! .event_store(event_store)
28//! .build();
29//! ```
30
31mod builder;
32mod helpers;
33mod idempotency;
34mod listen;
35mod llm;
36mod run_loop;
37#[cfg(test)]
38mod test_utils;
39#[cfg(test)]
40mod tests;
41mod tool_execution;
42mod turn;
43mod types;
44
45use self::run_loop::{run_loop, run_single_turn};
46use self::types::{RunLoopParameters, TurnParameters};
47use crate::types::TurnOptions;
48
49pub use self::builder::AgentLoopBuilder;
50
51use crate::authority::{EventAuthority, LocalEventAuthority};
52use crate::context::{CompactionConfig, ContextCompactor};
53use crate::hooks::AgentHooks;
54use crate::llm::LlmProvider;
55use crate::stores::{EventStore, MessageStore, StateStore, ToolExecutionStore};
56use crate::tools::{ToolContext, ToolRegistry};
57use crate::types::{AgentConfig, AgentError, AgentInput, AgentRunState, RunOptions, ThreadId};
58use futures::FutureExt;
59use std::future::Future;
60use std::panic::AssertUnwindSafe;
61use std::sync::Arc;
62use tokio::sync::{mpsc, oneshot};
63use tokio_util::sync::CancellationToken;
64
65/// Run the agent loop with panic isolation at the spawned-task boundary.
66///
67/// A panic anywhere inside the run loop — most importantly inside the
68/// LLM provider call or the compaction provider call, neither of which
69/// is otherwise guarded — would unwind the spawned task, drop the
70/// `state_tx` oneshot, and surface to the caller as an opaque
71/// [`oneshot::error::RecvError`] (and, for subagents, be misclassified
72/// as `Disconnected`). Catching the unwind here turns it into a
73/// structured [`AgentRunState::Error`] that the task can still send on
74/// `state_tx`, so the run ends observably rather than silently tearing
75/// down the channel.
76///
77/// `AssertUnwindSafe` is sound at this boundary: the run loop owns its
78/// `RunLoopParameters` by value, so nothing it touches outlives the
79/// caught panic and is observed afterwards. The only value produced is
80/// the returned `AgentRunState`. Tool-level panics are already caught
81/// closer to the tool boundary (see
82/// `agent_loop::helpers::catch_tool_panic`) so the assistant
83/// `tool_use` / `tool_result` history stays balanced; this guard is the
84/// outer safety net for everything else.
85async fn run_loop_isolated<Ctx, P, H, M, S>(
86 params: RunLoopParameters<Ctx, P, H, M, S>,
87) -> AgentRunState
88where
89 Ctx: Send + Sync + Clone + 'static,
90 P: LlmProvider,
91 H: AgentHooks,
92 M: MessageStore,
93 S: StateStore,
94{
95 match AssertUnwindSafe(run_loop(params)).catch_unwind().await {
96 Ok(state) => state,
97 Err(payload) => {
98 let message = self::helpers::panic_payload_message(payload.as_ref());
99 log::error!("agent run loop panicked: {message}");
100 AgentRunState::Error(AgentError::new(
101 format!("Agent run panicked: {message}"),
102 false,
103 ))
104 }
105 }
106}
107
108/// Drop a spawned run task's [`tokio::task::JoinHandle`], logging a
109/// `debug!` to make the detach visible.
110///
111/// `run` / `run_with_options` intentionally drop the handle: the run is
112/// stopped through the cancel token or the per-tool timeout, not by
113/// aborting the task. Surfacing the detach at `debug` level gives a
114/// breadcrumb when a subprocess-backed tool that ignores the
115/// cooperative-cancel contract leaks a process after cancellation.
116fn warn_on_detached_run_handle(handle: tokio::task::JoinHandle<()>) {
117 log::debug!(
118 "agent run JoinHandle dropped (task detached); the run can only be \
119 stopped via its cancel token or per-tool timeout. Subprocess-backed \
120 tools must honour kill_on_drop or a token-aware kill to avoid leaks"
121 );
122 drop(handle);
123}
124
125/// Await a run's state receiver, mapping a dropped channel to an `anyhow`
126/// error so `run`/`run_with_options` can present an `impl Future` instead of
127/// a bare [`oneshot::Receiver`].
128///
129/// A panic inside the run is already converted to
130/// [`AgentRunState::Error`] before the channel send (see
131/// [`run_loop_isolated`]), so the only way `recv` fails is the sender being
132/// dropped without sending — a runtime shutdown rather than an agent error.
133async fn recv_run_state(
134 state_rx: oneshot::Receiver<AgentRunState>,
135) -> anyhow::Result<AgentRunState> {
136 state_rx
137 .await
138 .map_err(|_| anyhow::anyhow!("agent run task was dropped before reporting a final state"))
139}
140
141/// Handle to a persistent agent thread.
142///
143/// Returned by [`AgentLoop::run_persistent`]. Allows the caller to send
144/// new messages to the running agent and cancel execution.
145pub struct AgentHandle {
146 /// Send new messages to the running agent. The agent processes them as new
147 /// user turns once it parks between turns.
148 ///
149 /// Only [`AgentInput::Text`] and [`AgentInput::Message`] are supported on
150 /// this channel — they are the only variants that represent a fresh user
151 /// turn. Injecting [`AgentInput::Resume`], [`AgentInput::SubmitToolResults`],
152 /// or [`AgentInput::Continue`] ends the run with [`AgentRunState::Error`]
153 /// (those belong to the single-turn `run_turn` flow, not the persistent
154 /// loop). Dropping the sender ends the run cleanly with `Done`.
155 pub input_tx: mpsc::Sender<AgentInput>,
156 /// Final run state (sent once when the agent completes).
157 pub state_rx: oneshot::Receiver<AgentRunState>,
158 /// Cancel the running agent.
159 pub cancel_token: CancellationToken,
160}
161
162/// Configuration bundle for constructing an [`AgentLoop`] with compaction.
163pub struct AgentLoopCompactionConfig {
164 pub agent_config: AgentConfig,
165 pub compaction_config: CompactionConfig,
166}
167
168impl AgentLoopCompactionConfig {
169 #[must_use]
170 pub const fn new(agent_config: AgentConfig, compaction_config: CompactionConfig) -> Self {
171 Self {
172 agent_config,
173 compaction_config,
174 }
175 }
176}
177
178/// The main agent loop that orchestrates LLM calls and tool execution.
179///
180/// `AgentLoop` is the core component that:
181/// - Manages conversation state via message and state stores
182/// - Calls the LLM provider and processes responses
183/// - Executes tools through the tool registry
184/// - Persists events to the configured event store
185/// - Enforces hooks for tool permissions and lifecycle events
186///
187/// # Type Parameters
188///
189/// - `Ctx`: Application-specific context passed to tools (e.g., user ID, database)
190/// - `P`: The LLM provider implementation
191/// - `H`: The hooks implementation for lifecycle customization
192/// - `M`: The message store implementation
193/// - `S`: The state store implementation
194///
195/// # Event Storage
196///
197/// Every loop instance requires an [`EventStore`] configured at construction
198/// time. Events are written to that store for the entire lifecycle of the loop,
199/// and callers read them back from the store instead of receiving an in-process
200/// channel from the runtime.
201///
202/// # Running the Agent
203///
204/// ```ignore
205/// let final_state = agent.run(
206/// thread_id,
207/// AgentInput::Text("Hello!".to_string()),
208/// tool_ctx,
209/// );
210/// let state = final_state.await?;
211/// let events = event_store.get_events(&thread_id).await?;
212/// ```
213pub struct AgentLoop<Ctx, P, H, M, S>
214where
215 P: LlmProvider,
216 H: AgentHooks,
217 M: MessageStore,
218 S: StateStore,
219{
220 pub(super) provider: Arc<P>,
221 pub(super) tools: Arc<ToolRegistry<Ctx>>,
222 pub(super) hooks: Arc<H>,
223 pub(super) message_store: Arc<M>,
224 pub(super) state_store: Arc<S>,
225 pub(super) event_store: Arc<dyn EventStore>,
226 pub(super) event_authority: Option<Arc<dyn EventAuthority>>,
227 pub(super) config: AgentConfig,
228 pub(super) compaction_config: Option<CompactionConfig>,
229 pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
230 pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
231 pub(super) audit_sink: Arc<dyn crate::hooks::ToolAuditSink>,
232 #[cfg(feature = "otel")]
233 pub(super) observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
234}
235
236/// Create a new builder for constructing an `AgentLoop`.
237#[must_use]
238pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
239 AgentLoopBuilder::new()
240}
241
242impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
243where
244 Ctx: Send + Sync + 'static,
245 P: LlmProvider + 'static,
246 H: AgentHooks + 'static,
247 M: MessageStore + 'static,
248 S: StateStore + 'static,
249{
250 /// Create a new agent loop with all components specified directly.
251 #[must_use]
252 pub fn new(
253 provider: P,
254 tools: ToolRegistry<Ctx>,
255 hooks: H,
256 message_store: M,
257 state_store: S,
258 event_store: Arc<dyn EventStore>,
259 config: AgentConfig,
260 ) -> Self {
261 Self {
262 provider: Arc::new(provider),
263 tools: Arc::new(tools),
264 hooks: Arc::new(hooks),
265 message_store: Arc::new(message_store),
266 state_store: Arc::new(state_store),
267 event_store,
268 event_authority: None,
269 config,
270 compaction_config: None,
271 compactor: None,
272 execution_store: None,
273 audit_sink: Arc::new(crate::hooks::NoopAuditSink),
274 #[cfg(feature = "otel")]
275 observability_store: None,
276 }
277 }
278
279 /// Create a new agent loop with compaction enabled.
280 #[must_use]
281 pub fn with_compaction(
282 provider: P,
283 tools: ToolRegistry<Ctx>,
284 hooks: H,
285 message_store: M,
286 state_store: S,
287 event_store: Arc<dyn EventStore>,
288 config: AgentLoopCompactionConfig,
289 ) -> Self {
290 let AgentLoopCompactionConfig {
291 agent_config,
292 compaction_config,
293 } = config;
294 Self {
295 provider: Arc::new(provider),
296 tools: Arc::new(tools),
297 hooks: Arc::new(hooks),
298 message_store: Arc::new(message_store),
299 state_store: Arc::new(state_store),
300 event_store,
301 event_authority: None,
302 config: agent_config,
303 compaction_config: Some(compaction_config),
304 compactor: None,
305 execution_store: None,
306 audit_sink: Arc::new(crate::hooks::NoopAuditSink),
307 #[cfg(feature = "otel")]
308 observability_store: None,
309 }
310 }
311
312 /// Set the authoritative tool audit sink.
313 ///
314 /// When set, the loop emits a [`ToolAuditRecord`](crate::advanced::ToolAuditRecord)
315 /// at every tool-lifecycle transition (blocked, requires-confirmation,
316 /// cached, replayed, invalidated, completed, persistence-failed).
317 ///
318 /// The default is [`NoopAuditSink`](crate::hooks::NoopAuditSink) which
319 /// discards every record — suitable for local/CLI usage. Servers should
320 /// swap in a durable sink.
321 #[must_use]
322 pub fn with_audit_sink(mut self, sink: impl crate::hooks::ToolAuditSink + 'static) -> Self {
323 self.audit_sink = Arc::new(sink);
324 self
325 }
326
327 /// Set the observability store for `GenAI` payload capture.
328 ///
329 /// When set, the store is called at each LLM request boundary to decide
330 /// whether payloads are inlined on spans, externalized, or omitted.
331 #[cfg(feature = "otel")]
332 #[must_use]
333 pub fn with_observability_store(
334 mut self,
335 store: impl crate::observability::ObservabilityStore + 'static,
336 ) -> Self {
337 self.observability_store = Some(Arc::new(store));
338 self
339 }
340
341 /// Resolve the event authority for this run.
342 ///
343 /// If an external authority was configured via the builder, use it.
344 /// Otherwise create a fresh [`LocalEventAuthority`] that starts at 0
345 /// (the pre-existing local/CLI behaviour).
346 fn resolve_authority(&self) -> Arc<dyn EventAuthority> {
347 self.event_authority
348 .clone()
349 .unwrap_or_else(|| Arc::new(LocalEventAuthority::new()))
350 }
351
352 /// Run the agent loop.
353 ///
354 /// This method allows the agent to pause when a tool requires confirmation,
355 /// returning an `AgentRunState::AwaitingConfirmation` that contains the
356 /// state needed to resume.
357 ///
358 /// When the `cancel_token` is cancelled, the agent interrupts in-flight
359 /// work at the SDK boundary: the LLM stream and any non-streaming LLM call
360 /// are raced against the token (see `agent_loop/llm.rs`), and in-flight
361 /// tool executions are dropped with balanced `ToolResult`s synthesized so
362 /// the conversation history stays consistent. The run then ends with
363 /// `AgentRunState::Cancelled` — cancellation is *not* deferred to a turn
364 /// boundary. Tools that hold OS resources must honour the
365 /// [cooperative-cancel contract](crate::tools::Tool#cooperative-cancellation);
366 /// see the section below.
367 ///
368 /// # Arguments
369 ///
370 /// * `thread_id` - The thread identifier for this conversation
371 /// * `input` - Either a new text message or a resume with confirmation decision
372 /// * `tool_context` - Context passed to tools
373 /// * `cancel_token` - Token to signal cancellation from outside
374 ///
375 /// # Returns
376 ///
377 /// A future that resolves to the final [`AgentRunState`]. Awaiting it
378 /// drives the run to completion:
379 ///
380 /// ```ignore
381 /// let final_state = agent.run(thread_id, input, tool_ctx, cancel).await?;
382 /// ```
383 ///
384 /// The future is `'static` — the run is already spawned on a Tokio task
385 /// before this returns, so dropping the future does **not** stop the run
386 /// (use `cancel_token`). Awaiting it only waits for the result.
387 ///
388 /// # Example
389 ///
390 /// ```ignore
391 /// let cancel = CancellationToken::new();
392 /// let final_state = agent.run(
393 /// thread_id,
394 /// AgentInput::Text("Hello".to_string()),
395 /// tool_ctx,
396 /// cancel.clone(),
397 /// ).await?;
398 ///
399 /// match final_state {
400 /// AgentRunState::Done { .. } => { /* completed */ }
401 /// AgentRunState::Cancelled { .. } => { /* user cancelled */ }
402 /// AgentRunState::AwaitingConfirmation { continuation, .. } => {
403 /// // Get user decision, then resume:
404 /// let state2 = agent.run(
405 /// thread_id,
406 /// AgentInput::Resume {
407 /// continuation,
408 /// tool_call_id: id,
409 /// confirmed: true,
410 /// rejection_reason: None,
411 /// },
412 /// tool_ctx,
413 /// cancel.clone(),
414 /// ).await?;
415 /// }
416 /// AgentRunState::Error(e) => { /* handle error */ }
417 /// }
418 /// ```
419 /// # Cancellation, timeout, and the dropped `JoinHandle`
420 ///
421 /// `run` spawns the agent loop on a Tokio task and returns a future over
422 /// the state channel — it **drops** the task's
423 /// [`tokio::task::JoinHandle`]. Dropping a `JoinHandle` *detaches* the
424 /// task rather than aborting it, so the only ways to stop an in-flight
425 /// run are the `cancel_token` (cooperative) or the per-tool
426 /// [`AgentConfig::tool_timeout_ms`](crate::types::AgentConfig::tool_timeout_ms)
427 /// boundary. Callers that need to forcibly abort must use
428 /// [`run_abortable`](Self::run_abortable) and keep the handle.
429 ///
430 /// Because the handle is dropped, a tool that holds a subprocess open
431 /// must obey the
432 /// [cooperative-cancel contract](crate::tools::Tool#cooperative-cancellation)
433 /// (`kill_on_drop` or a token-aware `kill`) or the subprocess will
434 /// outlive the cancelled / timed-out run. A `debug!` is logged here so
435 /// the detach is visible when chasing a leaked subprocess.
436 ///
437 /// # Errors
438 ///
439 /// Returns an error only if the spawned run task is dropped before it can
440 /// report a final state (e.g. a runtime shutdown). A panic inside the run
441 /// is caught and surfaced as [`AgentRunState::Error`], not as an `Err`.
442 pub fn run(
443 &self,
444 thread_id: ThreadId,
445 input: AgentInput,
446 tool_context: ToolContext<Ctx>,
447 cancel_token: CancellationToken,
448 ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
449 where
450 Ctx: Clone,
451 {
452 let (state_rx, handle) = self.run_abortable(thread_id, input, tool_context, cancel_token);
453 warn_on_detached_run_handle(handle);
454 recv_run_state(state_rx)
455 }
456
457 /// Like [`run`](Self::run), but with caller-supplied trace metadata.
458 ///
459 /// Equivalent to `run` except that the supplied [`RunOptions`]
460 /// configure session/user IDs (propagated as `session.id` /
461 /// `user.id` baggage), `langfuse.trace.{name,tags,metadata.*,
462 /// input,output}`, `langfuse.{release,environment}`, and the
463 /// trace-text truncation ceiling.
464 ///
465 /// Use this instead of `run` whenever the consumer needs the
466 /// SDK to populate Langfuse trace metadata; `run` itself
467 /// continues to delegate here with `RunOptions::default()`.
468 ///
469 /// # Errors
470 ///
471 /// See [`run`](Self::run).
472 pub fn run_with_options(
473 &self,
474 thread_id: ThreadId,
475 input: AgentInput,
476 tool_context: ToolContext<Ctx>,
477 cancel_token: CancellationToken,
478 run_options: RunOptions,
479 ) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
480 where
481 Ctx: Clone,
482 {
483 let (state_rx, handle) = self.run_abortable_with_options(
484 thread_id,
485 input,
486 tool_context,
487 cancel_token,
488 run_options,
489 );
490 warn_on_detached_run_handle(handle);
491 recv_run_state(state_rx)
492 }
493
494 /// Like [`run`](Self::run), but also returns the [`tokio::task::JoinHandle`] for the
495 /// spawned task.
496 ///
497 /// Callers that need to forcibly abort the agent loop (e.g. subagent
498 /// timeout) can call [`tokio::task::JoinHandle::abort`] on the returned handle.
499 /// Aborting the handle drops the in-flight LLM stream immediately
500 /// instead of waiting for the current turn to finish.
501 pub fn run_abortable(
502 &self,
503 thread_id: ThreadId,
504 input: AgentInput,
505 tool_context: ToolContext<Ctx>,
506 cancel_token: CancellationToken,
507 ) -> (
508 oneshot::Receiver<AgentRunState>,
509 tokio::task::JoinHandle<()>,
510 )
511 where
512 Ctx: Clone,
513 {
514 self.run_abortable_with_options(
515 thread_id,
516 input,
517 tool_context,
518 cancel_token,
519 RunOptions::default(),
520 )
521 }
522
523 /// Like [`run_abortable`](Self::run_abortable), but with
524 /// caller-supplied trace metadata. See [`run_with_options`](Self::run_with_options).
525 pub fn run_abortable_with_options(
526 &self,
527 thread_id: ThreadId,
528 input: AgentInput,
529 tool_context: ToolContext<Ctx>,
530 cancel_token: CancellationToken,
531 run_options: RunOptions,
532 ) -> (
533 oneshot::Receiver<AgentRunState>,
534 tokio::task::JoinHandle<()>,
535 )
536 where
537 Ctx: Clone,
538 {
539 // `run_options` only feeds OTel root-span metadata. On
540 // non-otel builds the value is genuinely not needed —
541 // explicitly drop it so the unused-variable / needless-pass
542 // lints stay quiet without us reaching for an
543 // `#[allow(...)]`.
544 #[cfg(not(feature = "otel"))]
545 drop(run_options);
546
547 let (state_tx, state_rx) = oneshot::channel();
548 let authority = self.resolve_authority();
549
550 let provider = Arc::clone(&self.provider);
551 let tools = Arc::clone(&self.tools);
552 let hooks = Arc::clone(&self.hooks);
553 let message_store = Arc::clone(&self.message_store);
554 let state_store = Arc::clone(&self.state_store);
555 let event_store = Arc::clone(&self.event_store);
556 let config = self.config.clone();
557 let compaction_config = self.compaction_config.clone();
558 let compactor = self.compactor.clone();
559 let execution_store = self.execution_store.clone();
560 let audit_sink = Arc::clone(&self.audit_sink);
561 #[cfg(feature = "otel")]
562 let observability_store = self.observability_store.clone();
563 #[cfg(feature = "otel")]
564 let parent_cx = crate::observability::context::capture_context();
565
566 let task = async move {
567 let result = run_loop_isolated(RunLoopParameters {
568 event_store,
569 authority,
570 thread_id,
571 input,
572 tool_context,
573 provider,
574 tools,
575 hooks,
576 message_store,
577 state_store,
578 config,
579 compaction_config,
580 compactor,
581 execution_store,
582 audit_sink,
583 cancel_token,
584 input_rx: None,
585 #[cfg(feature = "otel")]
586 run_options,
587 #[cfg(feature = "otel")]
588 observability_store,
589 })
590 .await;
591
592 let _ = state_tx.send(result);
593 };
594
595 #[cfg(feature = "otel")]
596 let task = {
597 use opentelemetry::trace::FutureExt;
598 task.with_context(parent_cx)
599 };
600
601 let handle = tokio::spawn(task);
602
603 (state_rx, handle)
604 }
605
606 /// Run the agent with a persistent input channel.
607 ///
608 /// Unlike [`Self::run`], this returns an [`AgentHandle`] that allows the caller
609 /// to inject new user messages into the running agent via `input_tx`.
610 /// The agent will process the initial input, then wait for new messages
611 /// on the channel between turns instead of exiting on `Done`.
612 ///
613 /// The agent exits when:
614 /// - The `input_tx` sender is dropped (no more messages) — reports `Done`
615 /// - The `cancel_token` is cancelled — reports `Cancelled`
616 /// - Max turns exceeded — reports `Error`
617 /// - An unsupported input variant is injected, or appending an injected
618 /// message fails — reports `Error` (see [`AgentHandle::input_tx`])
619 pub fn run_persistent(
620 &self,
621 thread_id: ThreadId,
622 input: AgentInput,
623 tool_context: ToolContext<Ctx>,
624 cancel_token: CancellationToken,
625 ) -> AgentHandle
626 where
627 Ctx: Clone,
628 {
629 self.run_persistent_with_options(
630 thread_id,
631 input,
632 tool_context,
633 cancel_token,
634 RunOptions::default(),
635 )
636 }
637
638 /// Like [`run_persistent`](Self::run_persistent), but with
639 /// caller-supplied trace metadata. See
640 /// [`run_with_options`](Self::run_with_options).
641 pub fn run_persistent_with_options(
642 &self,
643 thread_id: ThreadId,
644 input: AgentInput,
645 tool_context: ToolContext<Ctx>,
646 cancel_token: CancellationToken,
647 run_options: RunOptions,
648 ) -> AgentHandle
649 where
650 Ctx: Clone,
651 {
652 // See `run_abortable_with_options` for why we explicitly
653 // drop `run_options` on non-otel builds.
654 #[cfg(not(feature = "otel"))]
655 drop(run_options);
656
657 let (state_tx, state_rx) = oneshot::channel();
658 let (input_tx, input_rx) = mpsc::channel(32);
659 let authority = self.resolve_authority();
660
661 let provider = Arc::clone(&self.provider);
662 let tools = Arc::clone(&self.tools);
663 let hooks = Arc::clone(&self.hooks);
664 let message_store = Arc::clone(&self.message_store);
665 let state_store = Arc::clone(&self.state_store);
666 let event_store = Arc::clone(&self.event_store);
667 let config = self.config.clone();
668 let compaction_config = self.compaction_config.clone();
669 let compactor = self.compactor.clone();
670 let execution_store = self.execution_store.clone();
671 let audit_sink = Arc::clone(&self.audit_sink);
672 #[cfg(feature = "otel")]
673 let observability_store = self.observability_store.clone();
674 let cancel_handle = cancel_token.clone();
675 #[cfg(feature = "otel")]
676 let parent_cx = crate::observability::context::capture_context();
677
678 let task = async move {
679 let result = run_loop_isolated(RunLoopParameters {
680 event_store,
681 authority,
682 thread_id,
683 input,
684 tool_context,
685 provider,
686 tools,
687 hooks,
688 message_store,
689 state_store,
690 config,
691 compaction_config,
692 compactor,
693 execution_store,
694 audit_sink,
695 cancel_token,
696 input_rx: Some(input_rx),
697 #[cfg(feature = "otel")]
698 run_options,
699 #[cfg(feature = "otel")]
700 observability_store,
701 })
702 .await;
703
704 let _ = state_tx.send(result);
705 };
706
707 #[cfg(feature = "otel")]
708 let task = {
709 use opentelemetry::trace::FutureExt;
710 task.with_context(parent_cx)
711 };
712
713 tokio::spawn(task);
714
715 AgentHandle {
716 input_tx,
717 state_rx,
718 cancel_token: cancel_handle,
719 }
720 }
721
722 /// Run a single turn of the agent loop — the authoritative server boundary.
723 ///
724 /// Unlike `run()`, this method executes exactly one turn **directly in the
725 /// caller's task** (no `tokio::spawn`) and returns the result inline. This
726 /// enables external orchestration where each turn can be dispatched as a
727 /// separate message (e.g., via Artemis or another message queue).
728 ///
729 /// When the `cancel_token` is cancelled, the turn will be aborted before
730 /// starting execution and return `TurnOutcome::Cancelled`.
731 ///
732 /// # Arguments
733 ///
734 /// * `thread_id` - The thread identifier for this conversation
735 /// * `input` - Text to start, Resume after confirmation, or Continue after a turn
736 /// * `tool_context` - Context passed to tools
737 /// * `cancel_token` - Token to signal cancellation from outside
738 /// * `options` - Execution options (tool runtime strategy, durability)
739 ///
740 /// # Returns
741 ///
742 /// A [`crate::types::TurnOutcome`] returned only after the configured event store's
743 /// `finish_turn(thread_id, turn)` barrier has completed.
744 ///
745 /// Every variant except [`crate::types::TurnOutcome::Error`] carries a
746 /// structured [`crate::types::TurnSummary`] in the `summary` field. This
747 /// summary is the **authoritative** server-facing outcome contract —
748 /// it contains the provider/model provenance, response ID, stop reason,
749 /// tool-call count, duration, and execution options for the turn. Server
750 /// code should read from `summary` rather than the legacy per-variant
751 /// fields (`total_turns`, `input_tokens`, `output_tokens`, …), which are
752 /// retained only for backwards compatibility with local callers.
753 ///
754 /// # Turn Outcomes
755 ///
756 /// - `NeedsMoreTurns` - Turn completed, call again with `AgentInput::Continue`
757 /// - `Done` - Agent completed successfully
758 /// - `AwaitingConfirmation` - Tool needs confirmation, call again with `AgentInput::Resume`
759 /// - `PendingToolCalls` - Tools need external execution (only with `ToolRuntime::External`)
760 /// - `Cancelled` - Turn was cancelled via the token
761 /// - `Error` - An error occurred (no summary attached)
762 ///
763 /// # Example
764 ///
765 /// ```ignore
766 /// use std::sync::Arc;
767 /// use agent_sdk::{InMemoryEventStore, TurnOptions};
768 ///
769 /// let cancel = CancellationToken::new();
770 /// let event_store = Arc::new(InMemoryEventStore::new());
771 /// let outcome = agent.run_turn(
772 /// thread_id.clone(),
773 /// AgentInput::Text("What is 2+2?".to_string()),
774 /// tool_ctx.clone(),
775 /// cancel,
776 /// TurnOptions::default(),
777 /// ).await;
778 ///
779 /// let events = event_store.get_events(&thread_id).await?;
780 ///
781 /// // Read server-facing metadata from the TurnSummary.
782 /// if let Some(summary) = outcome.summary() {
783 /// println!(
784 /// "turn={} provider={} model={} stop={:?} response_id={:?}",
785 /// summary.turn,
786 /// summary.provenance.provider,
787 /// summary.provenance.model,
788 /// summary.stop_reason,
789 /// summary.response_id,
790 /// );
791 /// }
792 ///
793 /// // Branch on the variant for flow control.
794 /// match outcome {
795 /// TurnOutcome::NeedsMoreTurns { turn, .. } => {
796 /// // Dispatch another message to continue
797 /// }
798 /// TurnOutcome::Done { .. } => {
799 /// // Conversation complete
800 /// }
801 /// TurnOutcome::PendingToolCalls { tool_calls, .. } => {
802 /// // Execute tools externally, then call run_turn with Continue
803 /// }
804 /// _ => {}
805 /// }
806 /// ```
807 pub async fn run_turn(
808 &self,
809 thread_id: ThreadId,
810 input: AgentInput,
811 tool_context: ToolContext<Ctx>,
812 cancel_token: CancellationToken,
813 options: TurnOptions,
814 ) -> crate::types::TurnOutcome
815 where
816 Ctx: Clone,
817 {
818 self.run_turn_with_options(
819 thread_id,
820 input,
821 tool_context,
822 cancel_token,
823 options,
824 RunOptions::default(),
825 )
826 .await
827 }
828
829 /// Like [`run_turn`](Self::run_turn), but with caller-supplied
830 /// trace metadata.
831 ///
832 /// See [`run_with_options`](Self::run_with_options) for the full
833 /// [`RunOptions`] contract. The `turn_options` parameter retains
834 /// its existing semantics (tool runtime / strict durability);
835 /// `run_options` is layered on top to populate Langfuse trace
836 /// metadata on the root `invoke_agent` span.
837 pub async fn run_turn_with_options(
838 &self,
839 thread_id: ThreadId,
840 input: AgentInput,
841 tool_context: ToolContext<Ctx>,
842 cancel_token: CancellationToken,
843 turn_options: TurnOptions,
844 run_options: RunOptions,
845 ) -> crate::types::TurnOutcome
846 where
847 Ctx: Clone,
848 {
849 // See `run_abortable_with_options` for why we explicitly
850 // drop `run_options` on non-otel builds.
851 #[cfg(not(feature = "otel"))]
852 drop(run_options);
853
854 let authority = self.resolve_authority();
855
856 run_single_turn(TurnParameters {
857 event_store: Arc::clone(&self.event_store),
858 authority,
859 thread_id,
860 input,
861 tool_context,
862 provider: Arc::clone(&self.provider),
863 tools: Arc::clone(&self.tools),
864 hooks: Arc::clone(&self.hooks),
865 message_store: Arc::clone(&self.message_store),
866 state_store: Arc::clone(&self.state_store),
867 config: self.config.clone(),
868 compaction_config: self.compaction_config.clone(),
869 compactor: self.compactor.clone(),
870 execution_store: self.execution_store.clone(),
871 audit_sink: Arc::clone(&self.audit_sink),
872 cancel_token,
873 turn_options,
874 #[cfg(feature = "otel")]
875 run_options,
876 #[cfg(feature = "otel")]
877 observability_store: self.observability_store.clone(),
878 })
879 .await
880 }
881}
882
883/// High-level convenience API for agents whose tools take no application
884/// context (`Ctx = ()`).
885///
886/// [`ask`](Self::ask) and [`send`](Self::send) collapse the four pieces of
887/// ceremony in the low-level [`run`](Self::run) path — constructing a
888/// [`ToolContext::new(())`](crate::ToolContext::new), creating a
889/// [`CancellationToken`], awaiting the run, and reassembling the assistant
890/// text out of the event store — into a single call that returns a `String`.
891///
892/// Reach for [`run`](Self::run) or [`run_turn`](Self::run_turn) when you need
893/// application context, cancellation, confirmation flow, or access to the raw
894/// [`AgentRunState`].
895impl<P, H, M, S> AgentLoop<(), P, H, M, S>
896where
897 P: LlmProvider + 'static,
898 H: AgentHooks + 'static,
899 M: MessageStore + 'static,
900 S: StateStore + 'static,
901{
902 /// Ask the agent a question and return its assembled reply.
903 ///
904 /// This is the 30-second on-ramp: it builds a fresh
905 /// [`ToolContext::new(())`](crate::ToolContext::new) and a
906 /// [`CancellationToken`] internally, runs the agent to completion, and
907 /// returns the assistant text emitted during this call concatenated into
908 /// one `String`.
909 ///
910 /// For confirmation flows, application context, or explicit cancellation,
911 /// use [`run`](Self::run) directly.
912 ///
913 /// # Errors
914 ///
915 /// Returns an error if the run task is dropped before reporting a state,
916 /// if the run ends in [`AgentRunState::Error`], or if the event store
917 /// cannot be read back.
918 pub async fn ask(
919 &self,
920 thread_id: ThreadId,
921 text: impl Into<String>,
922 ) -> anyhow::Result<String> {
923 self.send(thread_id, AgentInput::Text(text.into())).await
924 }
925
926 /// Send an [`AgentInput`] to the agent and return its assembled reply.
927 ///
928 /// Like [`ask`](Self::ask) but accepts a full [`AgentInput`] (e.g. to
929 /// resume after confirmation). Builds the
930 /// [`ToolContext`](crate::ToolContext) and [`CancellationToken`]
931 /// internally and returns the assistant text emitted during this call.
932 ///
933 /// # Errors
934 ///
935 /// Returns an error if the run task is dropped before reporting a state,
936 /// if the run ends in [`AgentRunState::Error`], or if the event store
937 /// cannot be read back.
938 pub async fn send(&self, thread_id: ThreadId, input: AgentInput) -> anyhow::Result<String> {
939 use crate::events::AgentEvent;
940
941 // Snapshot the existing event count so we only assemble text emitted
942 // by this call, not earlier turns persisted on the same thread.
943 // `event_count` + `get_events_since` avoid materializing (and cloning)
944 // the entire thread history twice per call — repeated sends on a
945 // long-lived thread would otherwise be O(n^2) cumulative.
946 let baseline = self.event_store.event_count(&thread_id).await?;
947
948 let state = self
949 .run(
950 thread_id.clone(),
951 input,
952 ToolContext::new(()),
953 CancellationToken::new(),
954 )
955 .await?;
956
957 if let AgentRunState::Error(error) = state {
958 return Err(anyhow::Error::new(error));
959 }
960
961 let events = self
962 .event_store
963 .get_events_since(&thread_id, baseline)
964 .await?;
965 let reply = events
966 .into_iter()
967 .filter_map(|envelope| match envelope.event {
968 AgentEvent::Text { text, .. } => Some(text),
969 _ => None,
970 })
971 .collect::<String>();
972
973 Ok(reply)
974 }
975}