Skip to main content

awaken_runtime/loop_runner/
mod.rs

1//! Minimal sequential agent loop driven by state machines.
2//!
3//! Run lifecycle: RunLifecycle (Running → StepCompleted → Done/Waiting)
4//! Tool call lifecycle: ToolCallStates (New → Running → Succeeded/Failed/Suspended)
5
6pub(crate) mod actions;
7mod checkpoint;
8mod compaction;
9mod inference;
10mod orchestrator;
11#[cfg(feature = "parallel-tools")]
12pub mod parallel_merge;
13mod resume;
14mod setup;
15mod step;
16
17#[cfg(test)]
18mod tests;
19
20use std::sync::Arc;
21
22use crate::cancellation::CancellationToken;
23use crate::phase::{ExecutionEnv, PhaseRuntime};
24use crate::registry::AgentResolver;
25use crate::state::MutationBatch;
26use awaken_contract::StateError;
27use awaken_contract::contract::event_sink::EventSink;
28use awaken_contract::contract::identity::RunIdentity;
29use awaken_contract::contract::inference::InferenceOverride;
30use awaken_contract::contract::message::Message;
31use awaken_contract::contract::storage::ThreadRunStore;
32use awaken_contract::contract::suspension::ToolCallResume;
33use awaken_contract::contract::tool::{ToolResult, ToolStatus};
34use futures::channel::mpsc;
35use serde_json::Value;
36
37use crate::agent::state::{RunLifecycle, ToolCallStates};
38
39// Re-export submodule items used by external callers
40pub use actions::LoopActionHandlersPlugin;
41pub use resume::prepare_resume;
42
43/// Plugin that registers the core state keys required by the loop runner.
44///
45/// Must be installed on the `StateStore` before running the loop.
46pub struct LoopStatePlugin;
47
48impl crate::plugins::Plugin for LoopStatePlugin {
49    fn descriptor(&self) -> crate::plugins::PluginDescriptor {
50        crate::plugins::PluginDescriptor {
51            name: "__loop_state",
52        }
53    }
54
55    fn register(
56        &self,
57        r: &mut crate::plugins::PluginRegistrar,
58    ) -> Result<(), awaken_contract::StateError> {
59        use crate::agent::state::{ContextMessageStore, ContextThrottleState};
60        use crate::state::{KeyScope, StateKeyOptions};
61
62        r.register_key::<RunLifecycle>(StateKeyOptions::default())?;
63        r.register_key::<ToolCallStates>(StateKeyOptions {
64            scope: KeyScope::Thread,
65            persistent: true,
66            ..StateKeyOptions::default()
67        })?;
68        r.register_key::<ContextThrottleState>(StateKeyOptions::default())?;
69        r.register_key::<ContextMessageStore>(StateKeyOptions::default())?;
70        r.register_key::<crate::agent::state::PendingWorkKey>(StateKeyOptions::default())?;
71
72        Ok(())
73    }
74}
75
76/// Errors from the agent loop.
77#[derive(Debug, thiserror::Error)]
78pub enum AgentLoopError {
79    #[error("inference failed: {0}")]
80    InferenceFailed(String),
81    #[error("storage failed: {0}")]
82    StorageError(String),
83    #[error("phase error: {0}")]
84    PhaseError(#[from] awaken_contract::StateError),
85    #[error("runtime error: {0}")]
86    RuntimeError(#[from] crate::error::RuntimeError),
87    #[error("invalid resume: {0}")]
88    InvalidResume(String),
89}
90
91impl From<awaken_contract::contract::executor::InferenceExecutionError> for AgentLoopError {
92    fn from(e: awaken_contract::contract::executor::InferenceExecutionError) -> Self {
93        Self::InferenceFailed(e.to_string())
94    }
95}
96
97impl From<crate::execution::executor::ToolExecutorError> for AgentLoopError {
98    fn from(e: crate::execution::executor::ToolExecutorError) -> Self {
99        Self::InferenceFailed(e.to_string())
100    }
101}
102
103/// Result of running the agent loop.
104#[derive(Debug)]
105pub struct AgentRunResult {
106    pub run_id: String,
107    pub response: String,
108    pub termination: awaken_contract::contract::lifecycle::TerminationReason,
109    pub steps: usize,
110}
111
112// -- Shared helpers --
113
114pub(crate) use awaken_contract::now_ms;
115
116fn commit_update<S: crate::state::StateKey>(
117    store: &crate::state::StateStore,
118    update: S::Update,
119) -> Result<(), awaken_contract::StateError> {
120    let mut patch = MutationBatch::new();
121    patch.update::<S>(update);
122    store.commit(patch)?;
123    Ok(())
124}
125
126fn tool_result_to_content(result: &ToolResult) -> String {
127    match &result.message {
128        Some(msg) => msg.clone(),
129        None => serde_json::to_string(&result.data).unwrap_or_default(),
130    }
131}
132
133fn tool_result_to_resume_payload(result: &ToolResult) -> Value {
134    match result.status {
135        ToolStatus::Success => {
136            if result.metadata.is_empty() {
137                result.data.clone()
138            } else {
139                serde_json::json!({
140                    "data": result.data,
141                    "metadata": result.metadata,
142                })
143            }
144        }
145        ToolStatus::Error => {
146            if let Some(message) = result.message.as_ref() {
147                serde_json::json!({ "error": message })
148            } else {
149                result.data.clone()
150            }
151        }
152        ToolStatus::Pending => Value::Null,
153    }
154}
155
156/// All parameters for executing the agent loop.
157pub struct AgentLoopParams<'a> {
158    /// Resolves agent IDs to config + execution environment.
159    pub resolver: &'a dyn AgentResolver,
160    /// Initial agent to resolve at loop start.
161    pub agent_id: &'a str,
162    /// Phase runtime (state store + hook executor).
163    pub runtime: &'a PhaseRuntime,
164    /// Event sink for streaming events to the caller.
165    pub sink: Arc<dyn EventSink>,
166    /// Optional persistent storage for checkpointing.
167    pub checkpoint_store: Option<&'a dyn ThreadRunStore>,
168    /// Messages to seed the conversation (history + new user input).
169    pub messages: Vec<Message>,
170    /// Run identity (thread, run, agent IDs).
171    pub run_identity: RunIdentity,
172    /// Cooperative cancellation token.
173    pub cancellation_token: Option<CancellationToken>,
174    /// Live decision channel for suspended tool calls (batched by sender).
175    pub decision_rx: Option<mpsc::UnboundedReceiver<Vec<(String, ToolCallResume)>>>,
176    /// Inference parameter overrides for this run.
177    pub overrides: Option<InferenceOverride>,
178    /// Frontend-defined tool descriptors to merge into the resolved agent.
179    ///
180    /// These are tools defined by the frontend (e.g. CopilotKit `useFrontendTool`)
181    /// whose execution happens client-side. They are made visible to the LLM but
182    /// have no executor — the runtime intercepts them before execution and suspends.
183    pub frontend_tools: Vec<awaken_contract::contract::tool::ToolDescriptor>,
184    /// Optional inbox receiver for background-task messages.
185    pub inbox: Option<crate::inbox::InboxReceiver>,
186    /// When `true`, the run is a continuation of a previous awaiting_tasks run.
187    /// The orchestrator emits `SetRunning` instead of `Start`.
188    pub is_continuation: bool,
189}
190
191/// Build an execution environment for the agent loop.
192///
193/// Injects runtime-required default plugins and conditionally adds
194/// context truncation when a policy is provided. All transforms and hooks
195/// flow through the standard plugin registration mechanism.
196///
197/// Prefer `AgentRuntime::run()` for production use.
198pub fn build_agent_env(
199    plugins: &[Arc<dyn crate::plugins::Plugin>],
200    agent: &crate::registry::ResolvedAgent,
201) -> Result<ExecutionEnv, StateError> {
202    let mut all_plugins =
203        crate::registry::resolve::inject_default_plugins(plugins.to_vec(), agent.max_rounds());
204
205    if let Some(policy) = agent.context_policy() {
206        all_plugins.push(Arc::new(crate::context::ContextTransformPlugin::new(
207            policy.clone(),
208        )));
209    }
210
211    ExecutionEnv::from_plugins(&all_plugins, &std::collections::HashSet::new())
212}
213
214/// Execute the agent loop. Prefer `AgentRuntime::run()` for production use.
215///
216/// Handles both fresh runs and resumed runs (state-driven detection).
217/// Supports dynamic agent handoff via `ActiveAgentIdKey` re-resolve at step boundaries.
218/// Cooperative cancellation via `CancellationToken`.
219pub async fn run_agent_loop(params: AgentLoopParams<'_>) -> Result<AgentRunResult, AgentLoopError> {
220    orchestrator::run_agent_loop_impl(params, None).await
221}
222
223pub(crate) async fn run_agent_loop_with_thread_context(
224    params: AgentLoopParams<'_>,
225    thread_ctx: Option<crate::ThreadContextSnapshot>,
226) -> Result<AgentRunResult, AgentLoopError> {
227    orchestrator::run_agent_loop_impl(params, thread_ctx).await
228}