awaken_runtime/loop_runner/
mod.rs1pub(crate) mod actions;
7mod checkpoint;
8mod compaction;
9mod inference;
10mod orchestrator;
11#[cfg(feature = "parallel-tools")]
12pub mod parallel_merge;
13mod resume;
14mod setup;
15mod step;
16
17#[cfg(test)]
18mod tests;
19
20use std::sync::Arc;
21
22use crate::cancellation::CancellationToken;
23use crate::phase::{ExecutionEnv, PhaseRuntime};
24use crate::registry::AgentResolver;
25use crate::state::MutationBatch;
26use awaken_contract::StateError;
27use awaken_contract::contract::event_sink::EventSink;
28use awaken_contract::contract::identity::RunIdentity;
29use awaken_contract::contract::inference::InferenceOverride;
30use awaken_contract::contract::message::Message;
31use awaken_contract::contract::storage::ThreadRunStore;
32use awaken_contract::contract::suspension::ToolCallResume;
33use awaken_contract::contract::tool::{ToolResult, ToolStatus};
34use futures::channel::mpsc;
35use serde_json::Value;
36
37use crate::agent::state::{RunLifecycle, ToolCallStates};
38
39pub use actions::LoopActionHandlersPlugin;
41pub use resume::prepare_resume;
42
43pub struct LoopStatePlugin;
47
48impl crate::plugins::Plugin for LoopStatePlugin {
49 fn descriptor(&self) -> crate::plugins::PluginDescriptor {
50 crate::plugins::PluginDescriptor {
51 name: "__loop_state",
52 }
53 }
54
55 fn register(
56 &self,
57 r: &mut crate::plugins::PluginRegistrar,
58 ) -> Result<(), awaken_contract::StateError> {
59 use crate::agent::state::{ContextMessageStore, ContextThrottleState};
60 use crate::state::{KeyScope, StateKeyOptions};
61
62 r.register_key::<RunLifecycle>(StateKeyOptions::default())?;
63 r.register_key::<ToolCallStates>(StateKeyOptions {
64 scope: KeyScope::Thread,
65 persistent: true,
66 ..StateKeyOptions::default()
67 })?;
68 r.register_key::<ContextThrottleState>(StateKeyOptions::default())?;
69 r.register_key::<ContextMessageStore>(StateKeyOptions::default())?;
70 r.register_key::<crate::agent::state::PendingWorkKey>(StateKeyOptions::default())?;
71
72 Ok(())
73 }
74}
75
76#[derive(Debug, thiserror::Error)]
78pub enum AgentLoopError {
79 #[error("inference failed: {0}")]
80 InferenceFailed(String),
81 #[error("storage failed: {0}")]
82 StorageError(String),
83 #[error("phase error: {0}")]
84 PhaseError(#[from] awaken_contract::StateError),
85 #[error("runtime error: {0}")]
86 RuntimeError(#[from] crate::error::RuntimeError),
87 #[error("invalid resume: {0}")]
88 InvalidResume(String),
89}
90
91impl From<awaken_contract::contract::executor::InferenceExecutionError> for AgentLoopError {
92 fn from(e: awaken_contract::contract::executor::InferenceExecutionError) -> Self {
93 Self::InferenceFailed(e.to_string())
94 }
95}
96
97impl From<crate::execution::executor::ToolExecutorError> for AgentLoopError {
98 fn from(e: crate::execution::executor::ToolExecutorError) -> Self {
99 Self::InferenceFailed(e.to_string())
100 }
101}
102
103#[derive(Debug)]
105pub struct AgentRunResult {
106 pub run_id: String,
107 pub response: String,
108 pub termination: awaken_contract::contract::lifecycle::TerminationReason,
109 pub steps: usize,
110}
111
112pub(crate) use awaken_contract::now_ms;
115
116fn commit_update<S: crate::state::StateKey>(
117 store: &crate::state::StateStore,
118 update: S::Update,
119) -> Result<(), awaken_contract::StateError> {
120 let mut patch = MutationBatch::new();
121 patch.update::<S>(update);
122 store.commit(patch)?;
123 Ok(())
124}
125
126fn tool_result_to_content(result: &ToolResult) -> String {
127 match &result.message {
128 Some(msg) => msg.clone(),
129 None => serde_json::to_string(&result.data).unwrap_or_default(),
130 }
131}
132
133fn tool_result_to_resume_payload(result: &ToolResult) -> Value {
134 match result.status {
135 ToolStatus::Success => {
136 if result.metadata.is_empty() {
137 result.data.clone()
138 } else {
139 serde_json::json!({
140 "data": result.data,
141 "metadata": result.metadata,
142 })
143 }
144 }
145 ToolStatus::Error => {
146 if let Some(message) = result.message.as_ref() {
147 serde_json::json!({ "error": message })
148 } else {
149 result.data.clone()
150 }
151 }
152 ToolStatus::Pending => Value::Null,
153 }
154}
155
156pub struct AgentLoopParams<'a> {
158 pub resolver: &'a dyn AgentResolver,
160 pub agent_id: &'a str,
162 pub runtime: &'a PhaseRuntime,
164 pub sink: Arc<dyn EventSink>,
166 pub checkpoint_store: Option<&'a dyn ThreadRunStore>,
168 pub messages: Vec<Message>,
170 pub run_identity: RunIdentity,
172 pub cancellation_token: Option<CancellationToken>,
174 pub decision_rx: Option<mpsc::UnboundedReceiver<Vec<(String, ToolCallResume)>>>,
176 pub overrides: Option<InferenceOverride>,
178 pub frontend_tools: Vec<awaken_contract::contract::tool::ToolDescriptor>,
184 pub inbox: Option<crate::inbox::InboxReceiver>,
186 pub is_continuation: bool,
189}
190
191pub fn build_agent_env(
199 plugins: &[Arc<dyn crate::plugins::Plugin>],
200 agent: &crate::registry::ResolvedAgent,
201) -> Result<ExecutionEnv, StateError> {
202 let mut all_plugins =
203 crate::registry::resolve::inject_default_plugins(plugins.to_vec(), agent.max_rounds());
204
205 if let Some(policy) = agent.context_policy() {
206 all_plugins.push(Arc::new(crate::context::ContextTransformPlugin::new(
207 policy.clone(),
208 )));
209 }
210
211 ExecutionEnv::from_plugins(&all_plugins, &std::collections::HashSet::new())
212}
213
214pub async fn run_agent_loop(params: AgentLoopParams<'_>) -> Result<AgentRunResult, AgentLoopError> {
220 orchestrator::run_agent_loop_impl(params, None).await
221}
222
223pub(crate) async fn run_agent_loop_with_thread_context(
224 params: AgentLoopParams<'_>,
225 thread_ctx: Option<crate::ThreadContextSnapshot>,
226) -> Result<AgentRunResult, AgentLoopError> {
227 orchestrator::run_agent_loop_impl(params, thread_ctx).await
228}