Skip to main content

awaken_runtime/loop_runner/
mod.rs

1//! Minimal sequential agent loop driven by state machines.
2//!
3//! Run lifecycle: RunLifecycle (Running → StepCompleted → Done/Waiting)
4//! Tool call lifecycle: ToolCallStates (New → Running → Succeeded/Failed/Suspended)
5
6pub(crate) mod actions;
7mod checkpoint;
8mod inference;
9mod orchestrator;
10#[cfg(feature = "parallel-tools")]
11pub mod parallel_merge;
12mod resume;
13mod setup;
14mod step;
15
16#[cfg(test)]
17mod tests;
18
19use std::sync::Arc;
20
21use crate::cancellation::CancellationToken;
22use crate::phase::{ExecutionEnv, PhaseRuntime};
23use crate::registry::AgentResolver;
24use crate::state::MutationBatch;
25use awaken_contract::StateError;
26use awaken_contract::contract::event_sink::EventSink;
27use awaken_contract::contract::identity::RunIdentity;
28use awaken_contract::contract::inference::InferenceOverride;
29use awaken_contract::contract::message::Message;
30use awaken_contract::contract::storage::ThreadRunStore;
31use awaken_contract::contract::suspension::ToolCallResume;
32use awaken_contract::contract::tool::{ToolResult, ToolStatus};
33use futures::channel::mpsc;
34use serde_json::Value;
35
36use crate::agent::state::{RunLifecycle, ToolCallStates};
37
38// Re-export submodule items used by external callers
39pub use actions::LoopActionHandlersPlugin;
40pub use resume::prepare_resume;
41
42/// Plugin that registers the core state keys required by the loop runner.
43///
44/// Must be installed on the `StateStore` before running the loop.
45pub struct LoopStatePlugin;
46
47impl crate::plugins::Plugin for LoopStatePlugin {
48    fn descriptor(&self) -> crate::plugins::PluginDescriptor {
49        crate::plugins::PluginDescriptor {
50            name: "__loop_state",
51        }
52    }
53
54    fn register(
55        &self,
56        r: &mut crate::plugins::PluginRegistrar,
57    ) -> Result<(), awaken_contract::StateError> {
58        use crate::agent::state::{ContextMessageStore, ContextThrottleState};
59        use crate::state::{KeyScope, StateKeyOptions};
60
61        r.register_key::<RunLifecycle>(StateKeyOptions::default())?;
62        r.register_key::<ToolCallStates>(StateKeyOptions {
63            scope: KeyScope::Thread,
64            persistent: true,
65            ..StateKeyOptions::default()
66        })?;
67        r.register_key::<ContextThrottleState>(StateKeyOptions::default())?;
68        r.register_key::<ContextMessageStore>(StateKeyOptions::default())?;
69        r.register_key::<crate::agent::state::PendingWorkKey>(StateKeyOptions::default())?;
70
71        Ok(())
72    }
73}
74
75/// Errors from the agent loop.
76#[derive(Debug, thiserror::Error)]
77pub enum AgentLoopError {
78    #[error("inference failed: {0}")]
79    InferenceFailed(String),
80    #[error("storage failed: {0}")]
81    StorageError(String),
82    #[error("phase error: {0}")]
83    PhaseError(#[from] awaken_contract::StateError),
84    #[error("runtime error: {0}")]
85    RuntimeError(#[from] crate::error::RuntimeError),
86    #[error("invalid resume: {0}")]
87    InvalidResume(String),
88}
89
90impl From<awaken_contract::contract::executor::InferenceExecutionError> for AgentLoopError {
91    fn from(e: awaken_contract::contract::executor::InferenceExecutionError) -> Self {
92        Self::InferenceFailed(e.to_string())
93    }
94}
95
96impl From<crate::execution::executor::ToolExecutorError> for AgentLoopError {
97    fn from(e: crate::execution::executor::ToolExecutorError) -> Self {
98        Self::InferenceFailed(e.to_string())
99    }
100}
101
102/// Result of running the agent loop.
103#[derive(Debug)]
104pub struct AgentRunResult {
105    pub run_id: String,
106    pub response: String,
107    pub termination: awaken_contract::contract::lifecycle::TerminationReason,
108    pub steps: usize,
109}
110
111// -- Shared helpers --
112
113pub(crate) use awaken_contract::now_ms;
114
115fn commit_update<S: crate::state::StateKey>(
116    store: &crate::state::StateStore,
117    update: S::Update,
118) -> Result<(), awaken_contract::StateError> {
119    let mut patch = MutationBatch::new();
120    patch.update::<S>(update);
121    store.commit(patch)?;
122    Ok(())
123}
124
125fn tool_result_to_content(result: &ToolResult) -> String {
126    match &result.message {
127        Some(msg) => msg.clone(),
128        None => serde_json::to_string(&result.data).unwrap_or_default(),
129    }
130}
131
132fn tool_result_to_resume_payload(result: &ToolResult) -> Value {
133    match result.status {
134        ToolStatus::Success => {
135            if result.metadata.is_empty() {
136                result.data.clone()
137            } else {
138                serde_json::json!({
139                    "data": result.data,
140                    "metadata": result.metadata,
141                })
142            }
143        }
144        ToolStatus::Error => {
145            if let Some(message) = result.message.as_ref() {
146                serde_json::json!({ "error": message })
147            } else {
148                result.data.clone()
149            }
150        }
151        ToolStatus::Pending => Value::Null,
152    }
153}
154
155/// All parameters for executing the agent loop.
156pub struct AgentLoopParams<'a> {
157    /// Resolves agent IDs to config + execution environment.
158    pub resolver: &'a dyn AgentResolver,
159    /// Initial agent to resolve at loop start.
160    pub agent_id: &'a str,
161    /// Phase runtime (state store + hook executor).
162    pub runtime: &'a PhaseRuntime,
163    /// Event sink for streaming events to the caller.
164    pub sink: Arc<dyn EventSink>,
165    /// Optional persistent storage for checkpointing.
166    pub checkpoint_store: Option<&'a dyn ThreadRunStore>,
167    /// Messages to seed the conversation (history + new user input).
168    pub messages: Vec<Message>,
169    /// Run identity (thread, run, agent IDs).
170    pub run_identity: RunIdentity,
171    /// Cooperative cancellation token.
172    pub cancellation_token: Option<CancellationToken>,
173    /// Live decision channel for suspended tool calls (batched by sender).
174    pub decision_rx: Option<mpsc::UnboundedReceiver<Vec<(String, ToolCallResume)>>>,
175    /// Inference parameter overrides for this run.
176    pub overrides: Option<InferenceOverride>,
177    /// Frontend-defined tool descriptors to merge into the resolved agent.
178    ///
179    /// These are tools defined by the frontend (e.g. CopilotKit `useFrontendTool`)
180    /// whose execution happens client-side. They are made visible to the LLM but
181    /// have no executor — the runtime intercepts them before execution and suspends.
182    pub frontend_tools: Vec<awaken_contract::contract::tool::ToolDescriptor>,
183    /// Optional inbox receiver for background-task messages.
184    pub inbox: Option<crate::inbox::InboxReceiver>,
185    /// When `true`, the run is a continuation of a previous awaiting_tasks run.
186    /// The orchestrator emits `SetRunning` instead of `Start`.
187    pub is_continuation: bool,
188}
189
190/// Build an execution environment for the agent loop.
191///
192/// Injects runtime-required default plugins and conditionally adds
193/// context truncation when a policy is provided. All transforms and hooks
194/// flow through the standard plugin registration mechanism.
195///
196/// Prefer `AgentRuntime::run()` for production use.
197pub fn build_agent_env(
198    plugins: &[Arc<dyn crate::plugins::Plugin>],
199    agent: &crate::registry::ResolvedAgent,
200) -> Result<ExecutionEnv, StateError> {
201    let mut all_plugins =
202        crate::registry::resolve::inject_default_plugins(plugins.to_vec(), agent.max_rounds());
203
204    if let Some(policy) = agent.context_policy() {
205        all_plugins.push(Arc::new(crate::context::ContextTransformPlugin::new(
206            policy.clone(),
207        )));
208    }
209
210    ExecutionEnv::from_plugins(&all_plugins, &std::collections::HashSet::new())
211}
212
213/// Execute the agent loop. Prefer `AgentRuntime::run()` for production use.
214///
215/// Handles both fresh runs and resumed runs (state-driven detection).
216/// Supports dynamic agent handoff via `ActiveAgentIdKey` re-resolve at step boundaries.
217/// Cooperative cancellation via `CancellationToken`.
218pub async fn run_agent_loop(params: AgentLoopParams<'_>) -> Result<AgentRunResult, AgentLoopError> {
219    orchestrator::run_agent_loop_impl(params).await
220}