pub(crate) mod actions;
mod checkpoint;
mod compaction;
mod inference;
mod orchestrator;
#[cfg(feature = "parallel-tools")]
pub mod parallel_merge;
mod resume;
mod setup;
mod step;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use crate::cancellation::CancellationToken;
use crate::phase::{ExecutionEnv, PhaseRuntime};
use crate::registry::AgentResolver;
use crate::state::MutationBatch;
use awaken_contract::StateError;
use awaken_contract::contract::event_sink::EventSink;
use awaken_contract::contract::identity::RunIdentity;
use awaken_contract::contract::inference::InferenceOverride;
use awaken_contract::contract::message::Message;
use awaken_contract::contract::storage::ThreadRunStore;
use awaken_contract::contract::suspension::ToolCallResume;
use awaken_contract::contract::tool::{ToolResult, ToolStatus};
use futures::channel::mpsc;
use serde_json::Value;
use crate::agent::state::{RunLifecycle, ToolCallStates};
pub use actions::LoopActionHandlersPlugin;
pub use resume::prepare_resume;
pub struct LoopStatePlugin;
impl crate::plugins::Plugin for LoopStatePlugin {
fn descriptor(&self) -> crate::plugins::PluginDescriptor {
crate::plugins::PluginDescriptor {
name: "__loop_state",
}
}
fn register(
&self,
r: &mut crate::plugins::PluginRegistrar,
) -> Result<(), awaken_contract::StateError> {
use crate::agent::state::{ContextMessageStore, ContextThrottleState};
use crate::state::{KeyScope, StateKeyOptions};
r.register_key::<RunLifecycle>(StateKeyOptions::default())?;
r.register_key::<ToolCallStates>(StateKeyOptions {
scope: KeyScope::Thread,
persistent: true,
..StateKeyOptions::default()
})?;
r.register_key::<ContextThrottleState>(StateKeyOptions::default())?;
r.register_key::<ContextMessageStore>(StateKeyOptions::default())?;
r.register_key::<crate::agent::state::PendingWorkKey>(StateKeyOptions::default())?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum AgentLoopError {
#[error("inference failed: {0}")]
InferenceFailed(String),
#[error("storage failed: {0}")]
StorageError(String),
#[error("phase error: {0}")]
PhaseError(#[from] awaken_contract::StateError),
#[error("runtime error: {0}")]
RuntimeError(#[from] crate::error::RuntimeError),
#[error("invalid resume: {0}")]
InvalidResume(String),
}
impl From<awaken_contract::contract::executor::InferenceExecutionError> for AgentLoopError {
fn from(e: awaken_contract::contract::executor::InferenceExecutionError) -> Self {
Self::InferenceFailed(e.to_string())
}
}
impl From<crate::execution::executor::ToolExecutorError> for AgentLoopError {
fn from(e: crate::execution::executor::ToolExecutorError) -> Self {
Self::InferenceFailed(e.to_string())
}
}
#[derive(Debug)]
pub struct AgentRunResult {
pub run_id: String,
pub response: String,
pub termination: awaken_contract::contract::lifecycle::TerminationReason,
pub steps: usize,
}
pub(crate) use awaken_contract::now_ms;
fn commit_update<S: crate::state::StateKey>(
store: &crate::state::StateStore,
update: S::Update,
) -> Result<(), awaken_contract::StateError> {
let mut patch = MutationBatch::new();
patch.update::<S>(update);
store.commit(patch)?;
Ok(())
}
fn tool_result_to_content(result: &ToolResult) -> String {
match &result.message {
Some(msg) => msg.clone(),
None => serde_json::to_string(&result.data).unwrap_or_default(),
}
}
fn tool_result_to_resume_payload(result: &ToolResult) -> Value {
match result.status {
ToolStatus::Success => {
if result.metadata.is_empty() {
result.data.clone()
} else {
serde_json::json!({
"data": result.data,
"metadata": result.metadata,
})
}
}
ToolStatus::Error => {
if let Some(message) = result.message.as_ref() {
serde_json::json!({ "error": message })
} else {
result.data.clone()
}
}
ToolStatus::Pending => Value::Null,
}
}
pub struct AgentLoopParams<'a> {
pub resolver: &'a dyn AgentResolver,
pub agent_id: &'a str,
pub runtime: &'a PhaseRuntime,
pub sink: Arc<dyn EventSink>,
pub checkpoint_store: Option<&'a dyn ThreadRunStore>,
pub messages: Vec<Message>,
pub run_identity: RunIdentity,
pub cancellation_token: Option<CancellationToken>,
pub decision_rx: Option<mpsc::UnboundedReceiver<Vec<(String, ToolCallResume)>>>,
pub overrides: Option<InferenceOverride>,
pub frontend_tools: Vec<awaken_contract::contract::tool::ToolDescriptor>,
pub inbox: Option<crate::inbox::InboxReceiver>,
pub is_continuation: bool,
}
pub fn build_agent_env(
plugins: &[Arc<dyn crate::plugins::Plugin>],
agent: &crate::registry::ResolvedAgent,
) -> Result<ExecutionEnv, StateError> {
let mut all_plugins =
crate::registry::resolve::inject_default_plugins(plugins.to_vec(), agent.max_rounds());
if let Some(policy) = agent.context_policy() {
all_plugins.push(Arc::new(crate::context::ContextTransformPlugin::new(
policy.clone(),
)));
}
ExecutionEnv::from_plugins(&all_plugins, &std::collections::HashSet::new())
}
pub async fn run_agent_loop(params: AgentLoopParams<'_>) -> Result<AgentRunResult, AgentLoopError> {
orchestrator::run_agent_loop_impl(params, None).await
}
pub(crate) async fn run_agent_loop_with_thread_context(
params: AgentLoopParams<'_>,
thread_ctx: Option<crate::ThreadContextSnapshot>,
) -> Result<AgentRunResult, AgentLoopError> {
orchestrator::run_agent_loop_impl(params, thread_ctx).await
}