awaken_runtime/loop_runner/
mod.rs1pub(crate) mod actions;
7mod checkpoint;
8mod inference;
9mod orchestrator;
10#[cfg(feature = "parallel-tools")]
11pub mod parallel_merge;
12mod resume;
13mod setup;
14mod step;
15
16#[cfg(test)]
17mod tests;
18
19use std::sync::Arc;
20
21use crate::cancellation::CancellationToken;
22use crate::phase::{ExecutionEnv, PhaseRuntime};
23use crate::registry::AgentResolver;
24use crate::state::MutationBatch;
25use awaken_contract::StateError;
26use awaken_contract::contract::event_sink::EventSink;
27use awaken_contract::contract::identity::RunIdentity;
28use awaken_contract::contract::inference::InferenceOverride;
29use awaken_contract::contract::message::Message;
30use awaken_contract::contract::storage::ThreadRunStore;
31use awaken_contract::contract::suspension::ToolCallResume;
32use awaken_contract::contract::tool::{ToolResult, ToolStatus};
33use futures::channel::mpsc;
34use serde_json::Value;
35
36use crate::agent::state::{RunLifecycle, ToolCallStates};
37
38pub use actions::LoopActionHandlersPlugin;
40pub use resume::prepare_resume;
41
42pub struct LoopStatePlugin;
46
47impl crate::plugins::Plugin for LoopStatePlugin {
48 fn descriptor(&self) -> crate::plugins::PluginDescriptor {
49 crate::plugins::PluginDescriptor {
50 name: "__loop_state",
51 }
52 }
53
54 fn register(
55 &self,
56 r: &mut crate::plugins::PluginRegistrar,
57 ) -> Result<(), awaken_contract::StateError> {
58 use crate::agent::state::{ContextMessageStore, ContextThrottleState};
59 use crate::state::{KeyScope, StateKeyOptions};
60
61 r.register_key::<RunLifecycle>(StateKeyOptions::default())?;
62 r.register_key::<ToolCallStates>(StateKeyOptions {
63 scope: KeyScope::Thread,
64 persistent: true,
65 ..StateKeyOptions::default()
66 })?;
67 r.register_key::<ContextThrottleState>(StateKeyOptions::default())?;
68 r.register_key::<ContextMessageStore>(StateKeyOptions::default())?;
69 r.register_key::<crate::agent::state::PendingWorkKey>(StateKeyOptions::default())?;
70
71 Ok(())
72 }
73}
74
75#[derive(Debug, thiserror::Error)]
77pub enum AgentLoopError {
78 #[error("inference failed: {0}")]
79 InferenceFailed(String),
80 #[error("storage failed: {0}")]
81 StorageError(String),
82 #[error("phase error: {0}")]
83 PhaseError(#[from] awaken_contract::StateError),
84 #[error("runtime error: {0}")]
85 RuntimeError(#[from] crate::error::RuntimeError),
86 #[error("invalid resume: {0}")]
87 InvalidResume(String),
88}
89
90impl From<awaken_contract::contract::executor::InferenceExecutionError> for AgentLoopError {
91 fn from(e: awaken_contract::contract::executor::InferenceExecutionError) -> Self {
92 Self::InferenceFailed(e.to_string())
93 }
94}
95
96impl From<crate::execution::executor::ToolExecutorError> for AgentLoopError {
97 fn from(e: crate::execution::executor::ToolExecutorError) -> Self {
98 Self::InferenceFailed(e.to_string())
99 }
100}
101
102#[derive(Debug)]
104pub struct AgentRunResult {
105 pub run_id: String,
106 pub response: String,
107 pub termination: awaken_contract::contract::lifecycle::TerminationReason,
108 pub steps: usize,
109}
110
111pub(crate) use awaken_contract::now_ms;
114
115fn commit_update<S: crate::state::StateKey>(
116 store: &crate::state::StateStore,
117 update: S::Update,
118) -> Result<(), awaken_contract::StateError> {
119 let mut patch = MutationBatch::new();
120 patch.update::<S>(update);
121 store.commit(patch)?;
122 Ok(())
123}
124
125fn tool_result_to_content(result: &ToolResult) -> String {
126 match &result.message {
127 Some(msg) => msg.clone(),
128 None => serde_json::to_string(&result.data).unwrap_or_default(),
129 }
130}
131
132fn tool_result_to_resume_payload(result: &ToolResult) -> Value {
133 match result.status {
134 ToolStatus::Success => {
135 if result.metadata.is_empty() {
136 result.data.clone()
137 } else {
138 serde_json::json!({
139 "data": result.data,
140 "metadata": result.metadata,
141 })
142 }
143 }
144 ToolStatus::Error => {
145 if let Some(message) = result.message.as_ref() {
146 serde_json::json!({ "error": message })
147 } else {
148 result.data.clone()
149 }
150 }
151 ToolStatus::Pending => Value::Null,
152 }
153}
154
155pub struct AgentLoopParams<'a> {
157 pub resolver: &'a dyn AgentResolver,
159 pub agent_id: &'a str,
161 pub runtime: &'a PhaseRuntime,
163 pub sink: Arc<dyn EventSink>,
165 pub checkpoint_store: Option<&'a dyn ThreadRunStore>,
167 pub messages: Vec<Message>,
169 pub run_identity: RunIdentity,
171 pub cancellation_token: Option<CancellationToken>,
173 pub decision_rx: Option<mpsc::UnboundedReceiver<Vec<(String, ToolCallResume)>>>,
175 pub overrides: Option<InferenceOverride>,
177 pub frontend_tools: Vec<awaken_contract::contract::tool::ToolDescriptor>,
183 pub inbox: Option<crate::inbox::InboxReceiver>,
185 pub is_continuation: bool,
188}
189
190pub fn build_agent_env(
198 plugins: &[Arc<dyn crate::plugins::Plugin>],
199 agent: &crate::registry::ResolvedAgent,
200) -> Result<ExecutionEnv, StateError> {
201 let mut all_plugins =
202 crate::registry::resolve::inject_default_plugins(plugins.to_vec(), agent.max_rounds());
203
204 if let Some(policy) = agent.context_policy() {
205 all_plugins.push(Arc::new(crate::context::ContextTransformPlugin::new(
206 policy.clone(),
207 )));
208 }
209
210 ExecutionEnv::from_plugins(&all_plugins, &std::collections::HashSet::new())
211}
212
213pub async fn run_agent_loop(params: AgentLoopParams<'_>) -> Result<AgentRunResult, AgentLoopError> {
219 orchestrator::run_agent_loop_impl(params).await
220}