mod builder;
mod helpers;
mod idempotency;
mod listen;
mod llm;
mod run_loop;
#[cfg(test)]
mod test_utils;
#[cfg(test)]
mod tests;
mod tool_execution;
mod turn;
mod types;
use self::run_loop::{run_loop, run_single_turn};
use self::types::{RunLoopParameters, TurnParameters};
use crate::types::TurnOptions;
pub use self::builder::AgentLoopBuilder;
use crate::authority::{EventAuthority, LocalEventAuthority};
use crate::context::{CompactionConfig, ContextCompactor};
use crate::hooks::AgentHooks;
use crate::llm::LlmProvider;
use crate::stores::{EventStore, MessageStore, StateStore, ToolExecutionStore};
use crate::tools::{ToolContext, ToolRegistry};
use crate::types::{AgentConfig, AgentError, AgentInput, AgentRunState, RunOptions, ThreadId};
use futures::FutureExt;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
async fn run_loop_isolated<Ctx, P, H, M, S>(
params: RunLoopParameters<Ctx, P, H, M, S>,
) -> AgentRunState
where
Ctx: Send + Sync + Clone + 'static,
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
match AssertUnwindSafe(run_loop(params)).catch_unwind().await {
Ok(state) => state,
Err(payload) => {
let message = self::helpers::panic_payload_message(payload.as_ref());
log::error!("agent run loop panicked: {message}");
AgentRunState::Error(AgentError::new(
format!("Agent run panicked: {message}"),
false,
))
}
}
}
fn warn_on_detached_run_handle(handle: tokio::task::JoinHandle<()>) {
log::debug!(
"agent run JoinHandle dropped (task detached); the run can only be \
stopped via its cancel token or per-tool timeout. Subprocess-backed \
tools must honour kill_on_drop or a token-aware kill to avoid leaks"
);
drop(handle);
}
async fn recv_run_state(
state_rx: oneshot::Receiver<AgentRunState>,
) -> anyhow::Result<AgentRunState> {
state_rx
.await
.map_err(|_| anyhow::anyhow!("agent run task was dropped before reporting a final state"))
}
pub struct AgentHandle {
pub input_tx: mpsc::Sender<AgentInput>,
pub state_rx: oneshot::Receiver<AgentRunState>,
pub cancel_token: CancellationToken,
}
pub struct AgentLoopCompactionConfig {
pub agent_config: AgentConfig,
pub compaction_config: CompactionConfig,
}
impl AgentLoopCompactionConfig {
#[must_use]
pub const fn new(agent_config: AgentConfig, compaction_config: CompactionConfig) -> Self {
Self {
agent_config,
compaction_config,
}
}
}
pub struct AgentLoop<Ctx, P, H, M, S>
where
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
pub(super) provider: Arc<P>,
pub(super) tools: Arc<ToolRegistry<Ctx>>,
pub(super) hooks: Arc<H>,
pub(super) message_store: Arc<M>,
pub(super) state_store: Arc<S>,
pub(super) event_store: Arc<dyn EventStore>,
pub(super) event_authority: Option<Arc<dyn EventAuthority>>,
pub(super) config: AgentConfig,
pub(super) compaction_config: Option<CompactionConfig>,
pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
pub(super) audit_sink: Arc<dyn crate::hooks::ToolAuditSink>,
#[cfg(feature = "otel")]
pub(super) observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
}
#[must_use]
pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
AgentLoopBuilder::new()
}
impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
where
Ctx: Send + Sync + 'static,
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
{
#[must_use]
pub fn new(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
event_store: Arc<dyn EventStore>,
config: AgentConfig,
) -> Self {
Self {
provider: Arc::new(provider),
tools: Arc::new(tools),
hooks: Arc::new(hooks),
message_store: Arc::new(message_store),
state_store: Arc::new(state_store),
event_store,
event_authority: None,
config,
compaction_config: None,
compactor: None,
execution_store: None,
audit_sink: Arc::new(crate::hooks::NoopAuditSink),
#[cfg(feature = "otel")]
observability_store: None,
}
}
#[must_use]
pub fn with_compaction(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
event_store: Arc<dyn EventStore>,
config: AgentLoopCompactionConfig,
) -> Self {
let AgentLoopCompactionConfig {
agent_config,
compaction_config,
} = config;
Self {
provider: Arc::new(provider),
tools: Arc::new(tools),
hooks: Arc::new(hooks),
message_store: Arc::new(message_store),
state_store: Arc::new(state_store),
event_store,
event_authority: None,
config: agent_config,
compaction_config: Some(compaction_config),
compactor: None,
execution_store: None,
audit_sink: Arc::new(crate::hooks::NoopAuditSink),
#[cfg(feature = "otel")]
observability_store: None,
}
}
#[must_use]
pub fn with_audit_sink(mut self, sink: impl crate::hooks::ToolAuditSink + 'static) -> Self {
self.audit_sink = Arc::new(sink);
self
}
#[cfg(feature = "otel")]
#[must_use]
pub fn with_observability_store(
mut self,
store: impl crate::observability::ObservabilityStore + 'static,
) -> Self {
self.observability_store = Some(Arc::new(store));
self
}
fn resolve_authority(&self) -> Arc<dyn EventAuthority> {
self.event_authority
.clone()
.unwrap_or_else(|| Arc::new(LocalEventAuthority::new()))
}
pub fn run(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
where
Ctx: Clone,
{
let (state_rx, handle) = self.run_abortable(thread_id, input, tool_context, cancel_token);
warn_on_detached_run_handle(handle);
recv_run_state(state_rx)
}
pub fn run_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> impl Future<Output = anyhow::Result<AgentRunState>> + Send + 'static
where
Ctx: Clone,
{
let (state_rx, handle) = self.run_abortable_with_options(
thread_id,
input,
tool_context,
cancel_token,
run_options,
);
warn_on_detached_run_handle(handle);
recv_run_state(state_rx)
}
pub fn run_abortable(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> (
oneshot::Receiver<AgentRunState>,
tokio::task::JoinHandle<()>,
)
where
Ctx: Clone,
{
self.run_abortable_with_options(
thread_id,
input,
tool_context,
cancel_token,
RunOptions::default(),
)
}
pub fn run_abortable_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> (
oneshot::Receiver<AgentRunState>,
tokio::task::JoinHandle<()>,
)
where
Ctx: Clone,
{
#[cfg(not(feature = "otel"))]
drop(run_options);
let (state_tx, state_rx) = oneshot::channel();
let authority = self.resolve_authority();
let provider = Arc::clone(&self.provider);
let tools = Arc::clone(&self.tools);
let hooks = Arc::clone(&self.hooks);
let message_store = Arc::clone(&self.message_store);
let state_store = Arc::clone(&self.state_store);
let event_store = Arc::clone(&self.event_store);
let config = self.config.clone();
let compaction_config = self.compaction_config.clone();
let compactor = self.compactor.clone();
let execution_store = self.execution_store.clone();
let audit_sink = Arc::clone(&self.audit_sink);
#[cfg(feature = "otel")]
let observability_store = self.observability_store.clone();
#[cfg(feature = "otel")]
let parent_cx = crate::observability::context::capture_context();
let task = async move {
let result = run_loop_isolated(RunLoopParameters {
event_store,
authority,
thread_id,
input,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
cancel_token,
input_rx: None,
#[cfg(feature = "otel")]
run_options,
#[cfg(feature = "otel")]
observability_store,
})
.await;
let _ = state_tx.send(result);
};
#[cfg(feature = "otel")]
let task = {
use opentelemetry::trace::FutureExt;
task.with_context(parent_cx)
};
let handle = tokio::spawn(task);
(state_rx, handle)
}
pub fn run_persistent(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> AgentHandle
where
Ctx: Clone,
{
self.run_persistent_with_options(
thread_id,
input,
tool_context,
cancel_token,
RunOptions::default(),
)
}
pub fn run_persistent_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
run_options: RunOptions,
) -> AgentHandle
where
Ctx: Clone,
{
#[cfg(not(feature = "otel"))]
drop(run_options);
let (state_tx, state_rx) = oneshot::channel();
let (input_tx, input_rx) = mpsc::channel(32);
let authority = self.resolve_authority();
let provider = Arc::clone(&self.provider);
let tools = Arc::clone(&self.tools);
let hooks = Arc::clone(&self.hooks);
let message_store = Arc::clone(&self.message_store);
let state_store = Arc::clone(&self.state_store);
let event_store = Arc::clone(&self.event_store);
let config = self.config.clone();
let compaction_config = self.compaction_config.clone();
let compactor = self.compactor.clone();
let execution_store = self.execution_store.clone();
let audit_sink = Arc::clone(&self.audit_sink);
#[cfg(feature = "otel")]
let observability_store = self.observability_store.clone();
let cancel_handle = cancel_token.clone();
#[cfg(feature = "otel")]
let parent_cx = crate::observability::context::capture_context();
let task = async move {
let result = run_loop_isolated(RunLoopParameters {
event_store,
authority,
thread_id,
input,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
cancel_token,
input_rx: Some(input_rx),
#[cfg(feature = "otel")]
run_options,
#[cfg(feature = "otel")]
observability_store,
})
.await;
let _ = state_tx.send(result);
};
#[cfg(feature = "otel")]
let task = {
use opentelemetry::trace::FutureExt;
task.with_context(parent_cx)
};
tokio::spawn(task);
AgentHandle {
input_tx,
state_rx,
cancel_token: cancel_handle,
}
}
pub async fn run_turn(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
options: TurnOptions,
) -> crate::types::TurnOutcome
where
Ctx: Clone,
{
self.run_turn_with_options(
thread_id,
input,
tool_context,
cancel_token,
options,
RunOptions::default(),
)
.await
}
pub async fn run_turn_with_options(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
turn_options: TurnOptions,
run_options: RunOptions,
) -> crate::types::TurnOutcome
where
Ctx: Clone,
{
#[cfg(not(feature = "otel"))]
drop(run_options);
let authority = self.resolve_authority();
run_single_turn(TurnParameters {
event_store: Arc::clone(&self.event_store),
authority,
thread_id,
input,
tool_context,
provider: Arc::clone(&self.provider),
tools: Arc::clone(&self.tools),
hooks: Arc::clone(&self.hooks),
message_store: Arc::clone(&self.message_store),
state_store: Arc::clone(&self.state_store),
config: self.config.clone(),
compaction_config: self.compaction_config.clone(),
compactor: self.compactor.clone(),
execution_store: self.execution_store.clone(),
audit_sink: Arc::clone(&self.audit_sink),
cancel_token,
turn_options,
#[cfg(feature = "otel")]
run_options,
#[cfg(feature = "otel")]
observability_store: self.observability_store.clone(),
})
.await
}
}
impl<P, H, M, S> AgentLoop<(), P, H, M, S>
where
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
{
pub async fn ask(
&self,
thread_id: ThreadId,
text: impl Into<String>,
) -> anyhow::Result<String> {
self.send(thread_id, AgentInput::Text(text.into())).await
}
pub async fn send(&self, thread_id: ThreadId, input: AgentInput) -> anyhow::Result<String> {
use crate::events::AgentEvent;
let baseline = self.event_store.get_events(&thread_id).await?.len();
let state = self
.run(
thread_id.clone(),
input,
ToolContext::new(()),
CancellationToken::new(),
)
.await?;
if let AgentRunState::Error(error) = state {
return Err(anyhow::Error::new(error));
}
let events = self.event_store.get_events(&thread_id).await?;
let reply = events
.into_iter()
.skip(baseline)
.filter_map(|envelope| match envelope.event {
AgentEvent::Text { text, .. } => Some(text),
_ => None,
})
.collect::<String>();
Ok(reply)
}
}