mod builder;
mod helpers;
mod idempotency;
mod listen;
mod llm;
mod run_loop;
#[cfg(test)]
mod test_utils;
#[cfg(test)]
mod tests;
mod tool_execution;
mod turn;
mod types;
use self::run_loop::{run_loop, run_single_turn};
use self::types::{RunLoopParameters, TurnParameters};
pub use self::builder::AgentLoopBuilder;
use crate::context::{CompactionConfig, ContextCompactor};
use crate::events::{AgentEventEnvelope, SequenceCounter};
use crate::hooks::AgentHooks;
use crate::llm::LlmProvider;
use crate::stores::{MessageStore, StateStore, ToolExecutionStore};
use crate::tools::{ToolContext, ToolRegistry};
use crate::types::{AgentConfig, AgentInput, AgentRunState, ThreadId, TurnOutcome};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
pub struct AgentLoop<Ctx, P, H, M, S>
where
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
pub(super) provider: Arc<P>,
pub(super) tools: Arc<ToolRegistry<Ctx>>,
pub(super) hooks: Arc<H>,
pub(super) message_store: Arc<M>,
pub(super) state_store: Arc<S>,
pub(super) config: AgentConfig,
pub(super) compaction_config: Option<CompactionConfig>,
pub(super) compactor: Option<Arc<dyn ContextCompactor>>,
pub(super) execution_store: Option<Arc<dyn ToolExecutionStore>>,
}
#[must_use]
pub fn builder<Ctx>() -> AgentLoopBuilder<Ctx, (), (), (), ()> {
AgentLoopBuilder::new()
}
impl<Ctx, P, H, M, S> AgentLoop<Ctx, P, H, M, S>
where
Ctx: Send + Sync + 'static,
P: LlmProvider + 'static,
H: AgentHooks + 'static,
M: MessageStore + 'static,
S: StateStore + 'static,
{
#[must_use]
pub fn new(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
config: AgentConfig,
) -> Self {
Self {
provider: Arc::new(provider),
tools: Arc::new(tools),
hooks: Arc::new(hooks),
message_store: Arc::new(message_store),
state_store: Arc::new(state_store),
config,
compaction_config: None,
compactor: None,
execution_store: None,
}
}
#[must_use]
pub fn with_compaction(
provider: P,
tools: ToolRegistry<Ctx>,
hooks: H,
message_store: M,
state_store: S,
config: AgentConfig,
compaction_config: CompactionConfig,
) -> Self {
Self {
provider: Arc::new(provider),
tools: Arc::new(tools),
hooks: Arc::new(hooks),
message_store: Arc::new(message_store),
state_store: Arc::new(state_store),
config,
compaction_config: Some(compaction_config),
compactor: None,
execution_store: None,
}
}
pub fn run(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> (
mpsc::Receiver<AgentEventEnvelope>,
oneshot::Receiver<AgentRunState>,
)
where
Ctx: Clone,
{
let (event_tx, event_rx) = mpsc::channel(100);
let (state_tx, state_rx) = oneshot::channel();
let seq = SequenceCounter::new();
let provider = Arc::clone(&self.provider);
let tools = Arc::clone(&self.tools);
let hooks = Arc::clone(&self.hooks);
let message_store = Arc::clone(&self.message_store);
let state_store = Arc::clone(&self.state_store);
let config = self.config.clone();
let compaction_config = self.compaction_config.clone();
let compactor = self.compactor.clone();
let execution_store = self.execution_store.clone();
tokio::spawn(async move {
let result = run_loop(RunLoopParameters {
tx: event_tx,
seq,
thread_id,
input,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
cancel_token,
})
.await;
let _ = state_tx.send(result);
});
(event_rx, state_rx)
}
pub fn run_turn(
&self,
thread_id: ThreadId,
input: AgentInput,
tool_context: ToolContext<Ctx>,
cancel_token: CancellationToken,
) -> (
mpsc::Receiver<AgentEventEnvelope>,
oneshot::Receiver<TurnOutcome>,
)
where
Ctx: Clone,
{
let (event_tx, event_rx) = mpsc::channel(100);
let (outcome_tx, outcome_rx) = oneshot::channel();
let seq = SequenceCounter::new();
let provider = Arc::clone(&self.provider);
let tools = Arc::clone(&self.tools);
let hooks = Arc::clone(&self.hooks);
let message_store = Arc::clone(&self.message_store);
let state_store = Arc::clone(&self.state_store);
let config = self.config.clone();
let compaction_config = self.compaction_config.clone();
let compactor = self.compactor.clone();
let execution_store = self.execution_store.clone();
tokio::spawn(async move {
let result = run_single_turn(TurnParameters {
tx: event_tx,
seq,
thread_id,
input,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
cancel_token,
})
.await;
let _ = outcome_tx.send(result);
});
(event_rx, outcome_rx)
}
}