use crate::context::{TruncationState, try_consume_compaction_event};
use crate::inbox::inbox_payload_messages;
use crate::state::StateStore;
use awaken_contract::contract::event::AgentEvent;
use awaken_contract::contract::lifecycle::{RunStatus, TerminationReason};
use awaken_contract::contract::message::{Message, Role, gen_message_id};
use awaken_contract::model::Phase;
use super::checkpoint::{
CheckpointPersist, StepCompletion, check_termination, complete_step, emit_state_snapshot,
persist_checkpoint,
};
use super::resume::{
WaitOutcome, detect_and_replay_resume, has_suspended_calls, wait_for_resume_or_cancel,
};
use super::setup::{PreparedRun, prepare_run};
use super::step::{self, StepContext, StepOutcome, execute_step};
use super::{AgentLoopError, AgentLoopParams, AgentRunResult, commit_update, now_ms};
use crate::agent::state::{RunLifecycle, RunLifecycleUpdate, ToolCallStates, ToolCallStatesUpdate};
#[cfg(feature = "handoff")]
use crate::state::MutationBatch;
fn has_pending_work(store: &StateStore) -> bool {
use crate::agent::state::PendingWorkKey;
store
.read::<PendingWorkKey>()
.map(|s| s.has_pending)
.unwrap_or(false)
}
fn push_inbox_payload(messages: &mut Vec<std::sync::Arc<Message>>, payload: &serde_json::Value) {
for message in inbox_payload_messages(payload) {
messages.push(std::sync::Arc::new(message));
}
}
fn apply_inbox_payload(
messages: &mut Vec<std::sync::Arc<Message>>,
payload: &serde_json::Value,
store: &StateStore,
) -> bool {
if try_consume_compaction_event(messages, payload, store) {
return false;
}
push_inbox_payload(messages, payload);
true
}
#[tracing::instrument(skip_all, fields(agent_id = %params.agent_id, run_id = %params.run_identity.run_id))]
pub(super) async fn run_agent_loop_impl(
params: AgentLoopParams<'_>,
thread_ctx: Option<crate::ThreadContextSnapshot>,
) -> Result<AgentRunResult, AgentLoopError> {
let AgentLoopParams {
resolver,
agent_id: initial_agent_id,
runtime,
sink,
checkpoint_store,
messages: initial_messages,
run_identity,
cancellation_token,
decision_rx,
overrides: initial_overrides,
frontend_tools,
mut inbox,
is_continuation,
} = params;
let store = runtime.store();
let run_overrides = initial_overrides;
let mut decision_rx = decision_rx;
let run_created_at = now_ms();
let mut total_input_tokens: u64 = 0;
let mut total_output_tokens: u64 = 0;
let PreparedRun {
mut agent,
mut messages,
} = prepare_run(
resolver,
runtime,
initial_agent_id,
initial_messages,
&run_identity,
)
.await?;
let input_message_count = messages.len();
for desc in frontend_tools {
let id = desc.id.clone();
agent.tools.insert(
id,
std::sync::Arc::new(awaken_contract::contract::tool::FrontEndTool::new(desc)),
);
}
let mut steps: usize = 0;
let mut truncation_state = TruncationState::new();
if is_continuation {
commit_update::<RunLifecycle>(
store,
RunLifecycleUpdate::SetRunning {
updated_at: now_ms(),
},
)?;
} else {
commit_update::<RunLifecycle>(
store,
RunLifecycleUpdate::Start {
run_id: run_identity.run_id.clone(),
updated_at: now_ms(),
},
)?;
}
sink.emit(AgentEvent::RunStart {
thread_id: run_identity.thread_id.clone(),
run_id: run_identity.run_id.clone(),
parent_run_id: run_identity.parent_run_id.clone(),
identity: Some(run_identity.clone()),
})
.await;
detect_and_replay_resume(&agent, runtime, &run_identity, &mut messages, sink.clone()).await?;
match runtime
.run_phase_with_context(
&agent.env,
step::make_ctx(
Phase::RunStart,
&messages,
&run_identity,
store,
cancellation_token.as_ref(),
),
)
.await
{
Ok(_) => {}
Err(awaken_contract::StateError::Cancelled) => {
return Ok(AgentRunResult {
run_id: run_identity.run_id.clone(),
response: String::new(),
termination: TerminationReason::Cancelled,
steps: 0,
});
}
Err(e) => return Err(AgentLoopError::PhaseError(e)),
}
let termination = 'run_loop: loop {
steps += 1;
tracing::info!(step = steps, "step_start");
#[cfg(feature = "handoff")]
if let Some(Some(active_id)) =
store.read::<awaken_contract::contract::active_agent::ActiveAgentIdKey>()
&& active_id != agent.id()
{
match resolver.resolve(&active_id) {
Ok(resolved) => {
if !resolved.env.key_registrations.is_empty() {
store
.register_keys(&resolved.env.key_registrations)
.map_err(AgentLoopError::PhaseError)?;
}
{
let mut deactivate_patch = MutationBatch::new();
for plugin in &agent.env.plugins {
plugin
.on_deactivate(&mut deactivate_patch)
.map_err(AgentLoopError::PhaseError)?;
}
if !deactivate_patch.is_empty() {
store
.commit(deactivate_patch)
.map_err(AgentLoopError::PhaseError)?;
}
}
{
let mut activate_patch = MutationBatch::new();
for plugin in &resolved.env.plugins {
plugin
.on_activate(&resolved.spec, &mut activate_patch)
.map_err(AgentLoopError::PhaseError)?;
}
if !activate_patch.is_empty() {
store
.commit(activate_patch)
.map_err(AgentLoopError::PhaseError)?;
}
}
tracing::info!(from = %agent.id(), to = %active_id, "agent_handoff");
agent = resolved;
}
Err(e) => {
tracing::error!(agent_id = %active_id, error = %e, "handoff_resolve_failed");
break TerminationReason::Blocked(format!("handoff resolve failed: {e}"));
}
}
}
sink.emit(AgentEvent::StepStart {
message_id: gen_message_id(),
})
.await;
commit_update::<ToolCallStates>(store, ToolCallStatesUpdate::Clear)?;
let mut step_ctx = StepContext {
agent: &mut agent,
messages: &mut messages,
runtime,
sink: sink.clone(),
checkpoint_store,
run_identity: &run_identity,
input_message_count,
cancellation_token: cancellation_token.as_ref(),
run_overrides: &run_overrides,
total_input_tokens: &mut total_input_tokens,
total_output_tokens: &mut total_output_tokens,
truncation_state: &mut truncation_state,
run_created_at,
thread_ctx: thread_ctx.as_ref(),
};
let step_result = match execute_step(&mut step_ctx).await {
Ok(outcome) => outcome,
Err(AgentLoopError::PhaseError(awaken_contract::StateError::Cancelled)) => {
StepOutcome::Cancelled
}
Err(e) => return Err(e),
};
match step_result {
StepOutcome::Cancelled => {
complete_step(StepCompletion {
store,
runtime,
env: &agent.env,
sink: sink.as_ref(),
checkpoint_store,
messages: &messages,
input_message_count,
run_identity: &run_identity,
run_created_at,
total_input_tokens,
total_output_tokens,
thread_ctx: thread_ctx.as_ref(),
})
.await?;
break TerminationReason::Cancelled;
}
StepOutcome::NaturalEnd => {
let mut has_new_messages = false;
if let Some(ref mut inbox) = inbox {
for msg in inbox.drain() {
if apply_inbox_payload(&mut messages, &msg, store) {
has_new_messages = true;
}
}
}
if has_new_messages {
continue;
}
if has_pending_work(store) {
if run_identity.origin()
== awaken_contract::contract::identity::RunOrigin::Subagent
{
if let Some(ref mut inbox) = inbox {
match inbox.recv_or_cancel(cancellation_token.as_ref()).await {
Some(msg) => {
apply_inbox_payload(&mut messages, &msg, store);
for extra in inbox.drain() {
apply_inbox_payload(&mut messages, &extra, store);
}
continue; }
None => break TerminationReason::Cancelled,
}
}
}
commit_update::<RunLifecycle>(
store,
RunLifecycleUpdate::SetWaiting {
updated_at: now_ms(),
pause_reason: "awaiting_tasks".into(),
},
)?;
break TerminationReason::NaturalEnd;
} else {
break TerminationReason::NaturalEnd;
}
}
StepOutcome::Blocked(reason) => {
complete_step(StepCompletion {
store,
runtime,
env: &agent.env,
sink: sink.as_ref(),
checkpoint_store,
messages: &messages,
input_message_count,
run_identity: &run_identity,
run_created_at,
total_input_tokens,
total_output_tokens,
thread_ctx: thread_ctx.as_ref(),
})
.await?;
break TerminationReason::Blocked(reason);
}
StepOutcome::Terminated(reason) => {
complete_step(StepCompletion {
store,
runtime,
env: &agent.env,
sink: sink.as_ref(),
checkpoint_store,
messages: &messages,
input_message_count,
run_identity: &run_identity,
run_created_at,
total_input_tokens,
total_output_tokens,
thread_ctx: thread_ctx.as_ref(),
})
.await?;
break reason;
}
StepOutcome::Suspended => {
commit_update::<RunLifecycle>(
store,
RunLifecycleUpdate::SetWaiting {
updated_at: now_ms(),
pause_reason: "suspended".into(),
},
)?;
complete_step(StepCompletion {
store,
runtime,
env: &agent.env,
sink: sink.as_ref(),
checkpoint_store,
messages: &messages,
input_message_count,
run_identity: &run_identity,
run_created_at,
total_input_tokens,
total_output_tokens,
thread_ctx: thread_ctx.as_ref(),
})
.await?;
emit_state_snapshot(store, sink.as_ref()).await;
sink.emit(AgentEvent::RunFinish {
thread_id: run_identity.thread_id.clone(),
run_id: run_identity.run_id.clone(),
identity: Some(run_identity.clone()),
result: None,
termination: TerminationReason::Suspended,
})
.await;
loop {
match wait_for_resume_or_cancel(
decision_rx.as_mut(),
inbox.as_mut(),
cancellation_token.as_ref(),
runtime,
)
.await?
{
WaitOutcome::Resumed => {
sink.emit(AgentEvent::RunStart {
thread_id: run_identity.thread_id.clone(),
run_id: run_identity.run_id.clone(),
parent_run_id: run_identity.parent_run_id.clone(),
identity: Some(run_identity.clone()),
})
.await;
detect_and_replay_resume(
&agent,
runtime,
&run_identity,
&mut messages,
sink.clone(),
)
.await?;
if has_suspended_calls(store) {
emit_state_snapshot(store, sink.as_ref()).await;
sink.emit(AgentEvent::RunFinish {
thread_id: run_identity.thread_id.clone(),
run_id: run_identity.run_id.clone(),
identity: Some(run_identity.clone()),
result: None,
termination: TerminationReason::Suspended,
})
.await;
continue;
}
commit_update::<RunLifecycle>(
store,
RunLifecycleUpdate::SetRunning {
updated_at: now_ms(),
},
)?;
continue 'run_loop;
}
WaitOutcome::InboxMessages(events) => {
for event in events {
push_inbox_payload(&mut messages, &event);
}
persist_checkpoint(CheckpointPersist {
store,
checkpoint_store,
messages: &messages,
input_message_count,
run_identity: &run_identity,
run_created_at,
total_input_tokens,
total_output_tokens,
termination_reason: None,
final_output: None,
error_payload: None,
thread_ctx: thread_ctx.as_ref(),
})
.await?;
continue;
}
WaitOutcome::Cancelled => {
break 'run_loop TerminationReason::Cancelled;
}
WaitOutcome::NoDecisionChannel => {
break 'run_loop TerminationReason::Suspended;
}
}
}
}
StepOutcome::Continue => {
complete_step(StepCompletion {
store,
runtime,
env: &agent.env,
sink: sink.as_ref(),
checkpoint_store,
messages: &messages,
input_message_count,
run_identity: &run_identity,
run_created_at,
total_input_tokens,
total_output_tokens,
thread_ctx: thread_ctx.as_ref(),
})
.await?;
if let Some(ref mut inbox) = inbox {
for msg in inbox.drain() {
push_inbox_payload(&mut messages, &msg);
}
}
if let Some(reason) = check_termination(store) {
break reason;
}
}
}
};
tracing::warn!(reason = ?termination, "run_terminated");
let lifecycle_now = store.read::<RunLifecycle>().map(|s| s.status);
let (target_status, done_reason) = termination.to_run_status();
if target_status.is_terminal() && lifecycle_now != Some(RunStatus::Waiting) {
commit_update::<RunLifecycle>(
store,
RunLifecycleUpdate::Done {
done_reason: done_reason.unwrap_or_else(|| "unknown".into()),
updated_at: now_ms(),
},
)?;
}
match runtime
.run_phase_with_context(
&agent.env,
step::make_ctx(
Phase::RunEnd,
&messages,
&run_identity,
store,
cancellation_token.as_ref(),
),
)
.await
{
Ok(_) | Err(awaken_contract::StateError::Cancelled) => {}
Err(e) => return Err(AgentLoopError::PhaseError(e)),
}
let response = latest_run_response(&messages, input_message_count);
persist_checkpoint(CheckpointPersist {
store,
checkpoint_store,
messages: messages.as_slice(),
input_message_count,
run_identity: &run_identity,
run_created_at,
total_input_tokens,
total_output_tokens,
termination_reason: Some(termination.clone()),
final_output: (!response.is_empty()).then(|| response.clone()),
error_payload: match &termination {
TerminationReason::Error(message) => Some(serde_json::json!({ "message": message })),
_ => None,
},
thread_ctx: thread_ctx.as_ref(),
})
.await?;
emit_state_snapshot(store, sink.as_ref()).await;
let lifecycle_final = store.read::<RunLifecycle>();
let run_status = lifecycle_final
.as_ref()
.map(|l| l.status)
.unwrap_or(RunStatus::Done);
let status_reason = lifecycle_final.and_then(|l| l.status_reason);
let status_str = match run_status {
RunStatus::Created => "created",
RunStatus::Running => "running",
RunStatus::Waiting => "waiting",
RunStatus::Done => "done",
};
let mut result_json = serde_json::json!({"response": response, "status": status_str});
if let Some(reason) = &status_reason {
result_json["status_reason"] = serde_json::json!(reason);
}
sink.emit(AgentEvent::RunFinish {
thread_id: run_identity.thread_id.clone(),
run_id: run_identity.run_id.clone(),
identity: Some(run_identity.clone()),
result: Some(result_json),
termination: termination.clone(),
})
.await;
Ok(AgentRunResult {
run_id: run_identity.run_id.clone(),
response,
termination,
steps,
})
}
fn latest_run_response(messages: &[std::sync::Arc<Message>], input_message_count: usize) -> String {
messages
.get(input_message_count..)
.unwrap_or(&[])
.iter()
.rev()
.find(|message| message.role == Role::Assistant)
.map(|message| message.text())
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn latest_run_response_uses_current_run_non_tool_output() {
let messages = vec![
std::sync::Arc::new(Message::user("current request")),
std::sync::Arc::new(Message::assistant("previous context response")),
std::sync::Arc::new(Message::assistant("checking")),
std::sync::Arc::new(Message::tool("call-1", "tool result")),
std::sync::Arc::new(Message::assistant("final answer")),
std::sync::Arc::new(Message::tool("call-2", "late tool result")),
];
assert_eq!(latest_run_response(&messages, 2), "final answer");
assert_eq!(latest_run_response(&messages, messages.len()), "");
}
mod pending_work_tests {
use super::*;
use crate::agent::state::PendingWorkKey;
fn store_with_loop_state() -> StateStore {
let store = StateStore::new();
store
.install_plugin(crate::loop_runner::LoopStatePlugin)
.unwrap();
store
}
#[test]
fn default_no_pending_work() {
let store = store_with_loop_state();
assert!(!has_pending_work(&store));
}
#[test]
fn pending_work_set_true() {
let store = store_with_loop_state();
let mut batch = MutationBatch::new();
batch.update::<PendingWorkKey>(true);
store.commit(batch).unwrap();
assert!(has_pending_work(&store));
}
#[test]
fn pending_work_cleared() {
let store = store_with_loop_state();
let mut batch = MutationBatch::new();
batch.update::<PendingWorkKey>(true);
store.commit(batch).unwrap();
assert!(has_pending_work(&store));
let mut batch2 = MutationBatch::new();
batch2.update::<PendingWorkKey>(false);
store.commit(batch2).unwrap();
assert!(!has_pending_work(&store));
}
}
mod check_termination_tests {
use super::*;
use crate::agent::state::{RunLifecycle, RunLifecycleUpdate};
use crate::loop_runner::checkpoint::check_termination;
use awaken_contract::contract::lifecycle::TerminationReason;
fn store_with_lifecycle() -> StateStore {
let store = StateStore::new();
store
.install_plugin(crate::loop_runner::LoopStatePlugin)
.unwrap();
store
}
#[test]
fn running_returns_none() {
let store = store_with_lifecycle();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Start {
run_id: "r1".into(),
updated_at: 100,
},
)
.unwrap();
assert!(check_termination(&store).is_none());
}
#[test]
fn done_returns_termination_reason() {
let store = store_with_lifecycle();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Start {
run_id: "r1".into(),
updated_at: 100,
},
)
.unwrap();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Done {
done_reason: "natural".into(),
updated_at: 200,
},
)
.unwrap();
assert!(matches!(
check_termination(&store),
Some(TerminationReason::NaturalEnd)
));
}
#[test]
fn waiting_suspended_returns_suspended() {
let store = store_with_lifecycle();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Start {
run_id: "r1".into(),
updated_at: 100,
},
)
.unwrap();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::SetWaiting {
updated_at: 200,
pause_reason: "suspended".into(),
},
)
.unwrap();
assert!(matches!(
check_termination(&store),
Some(TerminationReason::Suspended)
));
}
#[test]
fn waiting_awaiting_tasks_returns_none() {
let store = store_with_lifecycle();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Start {
run_id: "r1".into(),
updated_at: 100,
},
)
.unwrap();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::SetWaiting {
updated_at: 200,
pause_reason: "awaiting_tasks".into(),
},
)
.unwrap();
assert!(
check_termination(&store).is_none(),
"awaiting_tasks should return None"
);
}
}
mod termination_sequence_tests {
use super::*;
use crate::agent::state::{RunLifecycle, RunLifecycleUpdate};
fn store_with_lifecycle() -> StateStore {
let store = StateStore::new();
store
.install_plugin(crate::loop_runner::LoopStatePlugin)
.unwrap();
store
}
#[test]
fn waiting_state_not_overwritten_by_done() {
let store = store_with_lifecycle();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Start {
run_id: "r1".into(),
updated_at: 100,
},
)
.unwrap();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::SetWaiting {
updated_at: 200,
pause_reason: "awaiting_tasks".into(),
},
)
.unwrap();
let lifecycle_now = store.read::<RunLifecycle>().map(|s| s.status);
let termination = TerminationReason::NaturalEnd;
let (target_status, _) = termination.to_run_status();
if target_status.is_terminal() && lifecycle_now != Some(RunStatus::Waiting) {
panic!("should not reach here — lifecycle is Waiting");
}
let state = store.read::<RunLifecycle>().unwrap();
assert_eq!(state.status, RunStatus::Waiting);
assert_eq!(state.status_reason.as_deref(), Some("awaiting_tasks"));
}
}
mod persist_checkpoint_tests {
use super::*;
use crate::agent::state::{RunLifecycle, RunLifecycleUpdate};
fn store_with_lifecycle() -> StateStore {
let store = StateStore::new();
store
.install_plugin(crate::loop_runner::LoopStatePlugin)
.unwrap();
store
}
#[test]
fn lifecycle_stores_status_reason_for_waiting() {
let store = store_with_lifecycle();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Start {
run_id: "r1".into(),
updated_at: 100,
},
)
.unwrap();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::SetWaiting {
updated_at: 200,
pause_reason: "awaiting_tasks".into(),
},
)
.unwrap();
let lifecycle = store.read::<RunLifecycle>().unwrap();
assert_eq!(lifecycle.status_reason.as_deref(), Some("awaiting_tasks"));
}
#[test]
fn lifecycle_stores_status_reason_for_done() {
let store = store_with_lifecycle();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Start {
run_id: "r1".into(),
updated_at: 100,
},
)
.unwrap();
crate::loop_runner::commit_update::<RunLifecycle>(
&store,
RunLifecycleUpdate::Done {
done_reason: "natural".into(),
updated_at: 200,
},
)
.unwrap();
let lifecycle = store.read::<RunLifecycle>().unwrap();
assert_eq!(lifecycle.status_reason.as_deref(), Some("natural"));
}
}
mod inbox_drain_tests {
use crate::inbox::inbox_channel;
#[test]
fn drain_returns_empty_when_no_messages() {
let (_tx, mut rx) = inbox_channel();
let msgs = rx.drain();
assert!(msgs.is_empty());
}
#[test]
fn drain_returns_all_pending_messages() {
let (tx, mut rx) = inbox_channel();
tx.send(serde_json::json!({"event": "a"}));
tx.send(serde_json::json!({"event": "b"}));
tx.send(serde_json::json!({"event": "c"}));
let msgs = rx.drain();
assert_eq!(msgs.len(), 3);
assert_eq!(msgs[0]["event"], "a");
assert_eq!(msgs[2]["event"], "c");
assert!(rx.drain().is_empty());
}
#[test]
fn drain_after_sender_drop_returns_buffered() {
let (tx, mut rx) = inbox_channel();
tx.send(serde_json::json!("buffered"));
drop(tx);
let msgs = rx.drain();
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0], "buffered");
}
#[test]
fn inbox_events_injected_as_internal_user_messages() {
let (tx, mut rx) = inbox_channel();
tx.send(
serde_json::json!({"kind": "custom", "event_type": "progress", "task_id": "bg_0"}),
);
let msgs = rx.drain();
for msg in &msgs {
let m = crate::inbox::inbox_event_message(msg);
assert_eq!(m.role, awaken_contract::contract::message::Role::User);
assert_eq!(
m.visibility,
awaken_contract::contract::message::Visibility::Internal
);
}
}
#[test]
fn inbox_events_wrapped_in_background_task_event_tag() {
let event = serde_json::json!({
"kind": "custom",
"task_id": "bg_42",
"event_type": "data_ready",
"payload": {"rows": 100}
});
let m = crate::inbox::inbox_event_message(&event);
let text = m.text();
assert!(
text.contains("<background-task-event"),
"should have opening tag: {text}"
);
assert!(
text.contains("</background-task-event>"),
"should have closing tag: {text}"
);
assert!(
text.contains("kind=\"custom\""),
"tag should contain kind: {text}"
);
assert!(
text.contains("task_id=\"bg_42\""),
"tag should contain task_id: {text}"
);
}
}
}