use super::helpers::{pending_tool_index, send_event, turns_to_u32};
use super::tool_execution::{append_tool_results, execute_confirmed_tool, execute_tool_call};
use super::turn::execute_turn;
use super::types::{
ConfirmedToolExecutionContext, ConvertTurnResultParams, ExecuteTurnParameters,
InitializedState, InternalTurnResult, PersistentDoneParams, ResumeData,
ResumeProcessingParameters, ResumeProcessingResult, ResumeSummaryMetrics, RunLoopParameters,
RunLoopResumeParams, RunLoopTurnResultParams, RunLoopTurnsParams, SingleTurnResumeParams,
ToolCallExecutionContext, ToolExecutionOutcome, TurnContext, TurnParameters,
};
use crate::types::TurnOptions;
use crate::authority::EventAuthority;
use crate::context::{CompactionConfig, ContextCompactor};
use crate::events::AgentEvent;
use crate::hooks::AgentHooks;
use crate::llm::{Content, ContentBlock, LlmProvider, Message, Role, StopReason};
use crate::stores::{EventStore, MessageStore, StateStore, ToolExecutionStore};
use crate::tools::{ToolContext, ToolRegistry};
use crate::types::{
AgentConfig, AgentContinuation, AgentError, AgentInput, AgentRunState, AgentState,
ContinuationEnvelope, ThreadId, TokenUsage, ToolResult, TurnOutcome, TurnSummary,
};
use agent_sdk_foundation::audit::AuditProvenance;
use log::warn;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
enum RunLoopTurnAction {
Continue,
FinishRun,
Return(AgentRunState),
}
fn apply_tool_boundary_controls<Ctx>(
tool_context: ToolContext<Ctx>,
cancel_token: &tokio_util::sync::CancellationToken,
tool_timeout_ms: Option<u64>,
) -> ToolContext<Ctx> {
let tool_context = tool_context.with_cancel_token(cancel_token.clone());
match tool_timeout_ms {
Some(ms) => tool_context.with_tool_timeout(std::time::Duration::from_millis(ms)),
None => tool_context,
}
}
pub(super) async fn initialize_from_input<M, S>(
input: AgentInput,
thread_id: &ThreadId,
message_store: &Arc<M>,
state_store: &Arc<S>,
execution_store: Option<&Arc<dyn ToolExecutionStore>>,
audit_sink: &Arc<dyn crate::hooks::ToolAuditSink>,
provenance: &agent_sdk_foundation::audit::AuditProvenance,
) -> Result<InitializedState, AgentError>
where
M: MessageStore,
S: StateStore,
{
match input {
AgentInput::Text(user_message) => {
recover_orphaned_tool_use(thread_id, message_store).await?;
let msg = Message::user(&user_message);
initialize_from_message(msg, thread_id, message_store, state_store).await
}
AgentInput::Message(blocks) => {
let msg = Message::user_with_content(blocks);
initialize_from_message(msg, thread_id, message_store, state_store).await
}
AgentInput::Resume {
continuation: envelope,
tool_call_id,
confirmed,
rejection_reason,
} => {
let continuation = Box::new(
envelope
.unwrap_validated()
.map_err(|msg| AgentError::new(msg, false))?,
);
if continuation.thread_id != *thread_id {
return Err(AgentError::new(
format!(
"Thread ID mismatch: continuation is for {}, but resuming on {}",
continuation.thread_id, thread_id
),
false,
));
}
Ok(InitializedState {
turn: continuation.turn,
total_usage: continuation.total_usage.clone(),
state: continuation.state.clone(),
resume_data: Some(ResumeData {
continuation,
tool_call_id,
confirmed,
rejection_reason,
}),
})
}
AgentInput::SubmitToolResults {
continuation: envelope,
results,
} => {
let continuation = Box::new(
envelope
.unwrap_validated()
.map_err(|msg| AgentError::new(msg, false))?,
);
initialize_from_tool_results(
continuation,
results,
thread_id,
message_store,
execution_store,
audit_sink,
provenance,
)
.await
}
AgentInput::Continue => {
let state = match state_store.load(thread_id).await {
Ok(Some(s)) => s,
Ok(None) => {
return Err(AgentError::new(
"Cannot continue: no state found for thread",
false,
));
}
Err(e) => {
return Err(AgentError::new(format!("Failed to load state: {e}"), false));
}
};
recover_orphaned_tool_use(thread_id, message_store).await?;
Ok(InitializedState {
turn: state.turn_count,
total_usage: state.total_usage.clone(),
state,
resume_data: None,
})
}
}
}
async fn initialize_from_message<M, S>(
user_msg: Message,
thread_id: &ThreadId,
message_store: &Arc<M>,
state_store: &Arc<S>,
) -> Result<InitializedState, AgentError>
where
M: MessageStore,
S: StateStore,
{
let state = match state_store.load(thread_id).await {
Ok(Some(s)) => s,
Ok(None) => AgentState::new(thread_id.clone()),
Err(e) => {
return Err(AgentError::new(format!("Failed to load state: {e}"), false));
}
};
if let Err(e) = message_store.append(thread_id, user_msg).await {
return Err(AgentError::new(
format!("Failed to append message: {e}"),
false,
));
}
Ok(InitializedState {
turn: 0,
total_usage: TokenUsage::default(),
state,
resume_data: None,
})
}
async fn initialize_from_tool_results<M: MessageStore>(
continuation: Box<AgentContinuation>,
results: Vec<crate::types::ExternalToolResult>,
thread_id: &ThreadId,
message_store: &Arc<M>,
execution_store: Option<&Arc<dyn ToolExecutionStore>>,
audit_sink: &Arc<dyn crate::hooks::ToolAuditSink>,
provenance: &agent_sdk_foundation::audit::AuditProvenance,
) -> Result<InitializedState, AgentError> {
use agent_sdk_foundation::audit::ToolAuditOutcome;
if continuation.thread_id != *thread_id {
return Err(AgentError::new(
format!(
"Thread ID mismatch: continuation is for {}, but resuming on {}",
continuation.thread_id, thread_id
),
false,
));
}
validate_external_tool_results(&continuation, &results)?;
let tool_results: Vec<(String, crate::types::ToolResult)> = results
.into_iter()
.map(|r| (r.tool_call_id, r.result))
.collect();
let mut fresh_results: Vec<(String, crate::types::ToolResult)> =
Vec::with_capacity(tool_results.len());
for (tool_call_id, result) in tool_results {
let already_completed = match execution_store {
Some(store) => store
.get_execution(&tool_call_id)
.await
.ok()
.flatten()
.is_some_and(|execution| execution.is_completed()),
None => false,
};
if already_completed {
emit_external_tool_audit(
audit_sink,
provenance,
&continuation,
&tool_call_id,
ToolAuditOutcome::Replayed {
result: result.clone(),
},
)
.await;
} else {
emit_external_tool_audit(
audit_sink,
provenance,
&continuation,
&tool_call_id,
ToolAuditOutcome::Completed {
result: result.clone(),
},
)
.await;
fresh_results.push((tool_call_id, result));
}
}
if fresh_results.is_empty() {
return Ok(InitializedState {
turn: continuation.turn,
total_usage: continuation.total_usage.clone(),
state: continuation.state.clone(),
resume_data: None,
});
}
if let Err(error) = append_tool_results(&fresh_results, thread_id, message_store).await {
for (tool_call_id, result) in &fresh_results {
emit_external_tool_audit(
audit_sink,
provenance,
&continuation,
tool_call_id,
ToolAuditOutcome::PersistenceFailed {
result: Some(result.clone()),
error: error.message.clone(),
},
)
.await;
}
return Err(error);
}
for (tool_call_id, result) in &fresh_results {
record_external_tool_execution(
execution_store,
&continuation,
thread_id,
tool_call_id,
result,
)
.await;
}
Ok(InitializedState {
turn: continuation.turn,
total_usage: continuation.total_usage.clone(),
state: continuation.state.clone(),
resume_data: None,
})
}
async fn record_external_tool_execution(
execution_store: Option<&Arc<dyn ToolExecutionStore>>,
continuation: &AgentContinuation,
thread_id: &ThreadId,
tool_call_id: &str,
result: &crate::types::ToolResult,
) {
let Some(store) = execution_store else {
return;
};
let pending = continuation
.pending_tool_calls
.iter()
.find(|p| p.id == tool_call_id);
let (tool_name, display_name, input) = pending.map_or_else(
|| (String::new(), String::new(), serde_json::Value::Null),
|p| (p.name.clone(), p.display_name.clone(), p.input.clone()),
);
let started_at = time::OffsetDateTime::now_utc();
let mut execution = crate::types::ToolExecution::new_in_flight(
tool_call_id,
thread_id.clone(),
tool_name,
display_name,
input,
started_at,
);
execution.complete(result.clone());
if let Err(e) = store.record_execution(execution).await {
warn!("Failed to record external tool execution (tool_call_id={tool_call_id}, error={e})");
}
}
async fn emit_external_tool_audit(
audit_sink: &Arc<dyn crate::hooks::ToolAuditSink>,
provenance: &agent_sdk_foundation::audit::AuditProvenance,
continuation: &AgentContinuation,
tool_call_id: &str,
outcome: agent_sdk_foundation::audit::ToolAuditOutcome,
) {
use crate::types::ToolTier;
use agent_sdk_foundation::audit::{ToolAuditRecord, ToolAuditRecordParams};
let pending = continuation
.pending_tool_calls
.iter()
.find(|p| p.id == tool_call_id);
let (tool_name, display_name, tier, requested_input, effective_input) = pending.map_or_else(
|| {
(
String::new(),
String::new(),
ToolTier::Confirm,
serde_json::Value::Null,
serde_json::Value::Null,
)
},
|p| {
(
p.name.clone(),
p.display_name.clone(),
p.tier,
p.input.clone(),
p.effective_input.clone(),
)
},
);
let record = ToolAuditRecord::new(ToolAuditRecordParams {
tool_call_id: tool_call_id.to_string(),
tool_name,
display_name,
tier,
requested_input,
effective_input,
turn: continuation.turn,
provenance: provenance.clone(),
outcome,
});
audit_sink.record(record).await;
}
fn validate_resume_continuation(
cont: &AgentContinuation,
tool_call_id: &str,
) -> Result<(), AgentError> {
if cont.awaiting_index >= cont.pending_tool_calls.len() {
return Err(AgentError::new(
format!(
"Invalid continuation: awaiting_index {} out of bounds ({})",
cont.awaiting_index,
cont.pending_tool_calls.len()
),
false,
));
}
let awaiting_tool = &cont.pending_tool_calls[cont.awaiting_index];
if awaiting_tool.id != tool_call_id {
return Err(AgentError::new(
format!(
"Tool call ID mismatch: expected {}, got {}",
awaiting_tool.id, tool_call_id
),
false,
));
}
Ok(())
}
fn validate_external_tool_results(
cont: &AgentContinuation,
results: &[crate::types::ExternalToolResult],
) -> Result<(), AgentError> {
if cont.pending_tool_calls.is_empty() {
return Err(AgentError::new(
"Invalid continuation: no pending tool calls to resolve",
false,
));
}
for pending in &cont.pending_tool_calls {
if !results.iter().any(|r| r.tool_call_id == pending.id) {
return Err(AgentError::new(
format!(
"Missing result for tool call '{}' (tool '{}')",
pending.id, pending.name,
),
false,
));
}
}
let mut seen = HashSet::with_capacity(results.len());
for result in results {
if !cont
.pending_tool_calls
.iter()
.any(|p| p.id == result.tool_call_id)
{
return Err(AgentError::new(
format!(
"Unknown tool call ID '{}' — not in the pending tool calls",
result.tool_call_id,
),
false,
));
}
if !seen.insert(&result.tool_call_id) {
return Err(AgentError::new(
format!(
"Duplicate result for tool call ID '{}'",
result.tool_call_id,
),
false,
));
}
}
Ok(())
}
pub(super) async fn process_resume<Ctx, H, M>(
ResumeProcessingParameters {
resume_data,
turn,
total_usage,
state,
thread_id,
tool_context,
tools,
hooks,
event_store,
authority,
message_store,
execution_store,
audit_sink,
provenance,
}: ResumeProcessingParameters<'_, Ctx, H, M>,
) -> Result<ResumeProcessingResult, AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
M: MessageStore,
{
let ResumeData {
continuation: cont,
tool_call_id,
confirmed,
rejection_reason,
} = resume_data;
validate_resume_continuation(&cont, &tool_call_id)?;
let carried_metadata = CarriedTurnMetadata {
response_id: cont.response_id.clone(),
stop_reason: cont.stop_reason,
tool_call_count: cont.pending_tool_calls.len(),
};
let awaiting_tool = &cont.pending_tool_calls[cont.awaiting_index];
let mut tool_results = cont.completed_results.clone();
let rejection =
(!confirmed).then(|| rejection_reason.unwrap_or_else(|| "User rejected".to_string()));
let confirmed_ctx = ConfirmedToolExecutionContext {
tool_context,
thread_id,
tools,
hooks,
event_store,
turn,
authority,
execution_store,
audit_sink,
provenance,
};
let result = execute_confirmed_tool(awaiting_tool, rejection, &confirmed_ctx).await?;
tool_results.push((awaiting_tool.id.clone(), result));
if let Some(result) = execute_remaining_pending_tools(ExecuteRemainingParams {
cont: &cont,
tool_results: &mut tool_results,
tool_context,
thread_id,
tools,
hooks,
event_store,
turn,
authority,
execution_store,
audit_sink,
provenance,
total_usage,
state,
carried: &carried_metadata,
})
.await?
{
return Ok(result);
}
append_tool_results(&tool_results, thread_id, message_store).await?;
send_event(
event_store,
thread_id,
turn,
hooks,
authority,
AgentEvent::TurnComplete {
turn,
usage: cont.turn_usage.clone(),
},
)
.await?;
Ok(ResumeProcessingResult::Completed {
turn_usage: cont.turn_usage.clone(),
metrics: ResumeSummaryMetrics {
response_id: carried_metadata.response_id,
stop_reason: carried_metadata.stop_reason,
tool_call_count: carried_metadata.tool_call_count,
},
})
}
struct CarriedTurnMetadata {
response_id: Option<String>,
stop_reason: Option<StopReason>,
tool_call_count: usize,
}
struct ExecuteRemainingParams<'a, Ctx, H> {
cont: &'a AgentContinuation,
tool_results: &'a mut Vec<(String, ToolResult)>,
tool_context: &'a ToolContext<Ctx>,
thread_id: &'a ThreadId,
tools: &'a Arc<ToolRegistry<Ctx>>,
hooks: &'a Arc<H>,
event_store: &'a Arc<dyn EventStore>,
turn: usize,
authority: &'a Arc<dyn EventAuthority>,
execution_store: Option<&'a Arc<dyn ToolExecutionStore>>,
audit_sink: &'a Arc<dyn crate::hooks::ToolAuditSink>,
provenance: &'a AuditProvenance,
total_usage: &'a TokenUsage,
state: &'a AgentState,
carried: &'a CarriedTurnMetadata,
}
async fn execute_remaining_pending_tools<Ctx, H>(
ExecuteRemainingParams {
cont,
tool_results,
tool_context,
thread_id,
tools,
hooks,
event_store,
turn,
authority,
execution_store,
audit_sink,
provenance,
total_usage,
state,
carried,
}: ExecuteRemainingParams<'_, Ctx, H>,
) -> Result<Option<ResumeProcessingResult>, AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
{
let execution_ctx = ToolCallExecutionContext {
tool_context,
thread_id,
tools,
hooks,
event_store,
turn,
authority,
execution_store,
audit_sink,
provenance,
};
for pending in cont.pending_tool_calls.iter().skip(cont.awaiting_index + 1) {
match execute_tool_call(pending, &execution_ctx).await {
ToolExecutionOutcome::Completed { tool_id, result } => {
tool_results.push((tool_id, result));
}
ToolExecutionOutcome::RequiresConfirmation {
tool_id,
tool_name,
display_name,
input,
description,
listen_context,
} => {
let pending_idx = pending_tool_index(&cont.pending_tool_calls, &tool_id)?;
let mut pending_tool_calls = cont.pending_tool_calls.clone();
if let Some(context) = listen_context {
pending_tool_calls[pending_idx].listen_context = Some(context);
}
return Ok(Some(ResumeProcessingResult::AwaitingConfirmation {
tool_call_id: tool_id,
tool_name,
display_name,
input,
description,
continuation: Box::new(AgentContinuation {
thread_id: thread_id.clone(),
turn,
total_usage: total_usage.clone(),
turn_usage: cont.turn_usage.clone(),
pending_tool_calls,
awaiting_index: pending_idx,
completed_results: std::mem::take(tool_results),
state: state.clone(),
response_id: carried.response_id.clone(),
stop_reason: carried.stop_reason,
response_content: Vec::new(),
}),
}));
}
ToolExecutionOutcome::Error(error) => return Err(error),
}
}
Ok(None)
}
pub(super) async fn handle_run_loop_resume<Ctx, H, M>(
RunLoopResumeParams {
resume_data,
turn,
total_usage,
state,
thread_id,
tool_context,
tools,
hooks,
event_store,
authority,
message_store,
execution_store,
audit_sink,
provenance,
}: RunLoopResumeParams<'_, Ctx, H, M>,
) -> Result<Option<AgentRunState>, AgentError>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
M: MessageStore,
{
match process_resume(ResumeProcessingParameters {
resume_data,
turn,
total_usage,
state,
thread_id,
tool_context,
tools,
hooks,
event_store,
authority,
message_store,
execution_store,
audit_sink,
provenance,
})
.await?
{
ResumeProcessingResult::Completed { .. } => Ok(None),
ResumeProcessingResult::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation,
} => Ok(Some(AgentRunState::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation: Box::new(ContinuationEnvelope::wrap(*continuation)),
})),
}
}
async fn finish_turn_or_error(
event_store: &Arc<dyn EventStore>,
thread_id: &ThreadId,
turn: usize,
) -> Result<(), AgentError> {
event_store
.finish_turn(thread_id, turn)
.await
.map_err(|error| {
AgentError::new(format!("Failed to finish turn event store: {error}"), false)
})
}
async fn initialize_run_loop_state<M, S>(
input: AgentInput,
thread_id: &ThreadId,
message_store: &Arc<M>,
state_store: &Arc<S>,
execution_store: Option<&Arc<dyn ToolExecutionStore>>,
audit_sink: &Arc<dyn crate::hooks::ToolAuditSink>,
provenance: &agent_sdk_foundation::audit::AuditProvenance,
) -> Result<InitializedState, AgentRunState>
where
M: MessageStore,
S: StateStore,
{
initialize_from_input(
input,
thread_id,
message_store,
state_store,
execution_store,
audit_sink,
provenance,
)
.await
.map_err(AgentRunState::Error)
}
async fn handle_run_loop_resume_state<Ctx, H, M>(
params: RunLoopResumeParams<'_, Ctx, H, M>,
) -> Option<AgentRunState>
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
M: MessageStore,
{
let turn = params.turn;
let thread_id = params.thread_id;
let event_store = params.event_store;
let hooks = params.hooks;
let authority = params.authority;
match handle_run_loop_resume(params).await {
Ok(Some(outcome)) => Some(outcome),
Ok(None) => {
if let Err(store_error) = finish_turn_or_error(event_store, thread_id, turn).await {
return Some(AgentRunState::Error(store_error));
}
None
}
Err(error) => {
if let Err(store_error) = send_event(
event_store,
thread_id,
turn,
hooks,
authority,
AgentEvent::error(&error.message, error.recoverable),
)
.await
{
return Some(AgentRunState::Error(store_error));
}
if let Err(store_error) = finish_turn_or_error(event_store, thread_id, turn).await {
return Some(AgentRunState::Error(store_error));
}
Some(AgentRunState::Error(error))
}
}
}
async fn initialize_single_turn_state<M, S>(
input: AgentInput,
thread_id: &ThreadId,
message_store: &Arc<M>,
state_store: &Arc<S>,
execution_store: Option<&Arc<dyn ToolExecutionStore>>,
audit_sink: &Arc<dyn crate::hooks::ToolAuditSink>,
provenance: &agent_sdk_foundation::audit::AuditProvenance,
) -> Result<InitializedState, TurnOutcome>
where
M: MessageStore,
S: StateStore,
{
match initialize_from_input(
input,
thread_id,
message_store,
state_store,
execution_store,
audit_sink,
provenance,
)
.await
{
Ok(state) => Ok(state),
Err(error) => Err(TurnOutcome::Error(error)),
}
}
async fn handle_single_turn_resume_state<Ctx, H, M, S>(
params: SingleTurnResumeParams<Ctx, H, M, S>,
) -> TurnOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
let turn = params.turn;
let thread_id = params.thread_id.clone();
let event_store = Arc::clone(¶ms.event_store);
let outcome = handle_single_turn_resume(params).await;
if !turn_outcome_keeps_turn_open(&outcome)
&& let Err(store_error) = finish_turn_or_error(&event_store, &thread_id, turn).await
{
return TurnOutcome::Error(store_error);
}
outcome
}
fn build_turn_context(
thread_id: &ThreadId,
turn: usize,
total_usage: TokenUsage,
state: AgentState,
start_time: Instant,
#[cfg(feature = "otel")] input_kind: &'static str,
) -> TurnContext {
TurnContext {
thread_id: thread_id.clone(),
turn,
total_usage,
state,
start_time,
compaction_retries: 0,
pending_reminder: None,
response_id: None,
stop_reason: None,
tool_call_count: 0,
#[cfg(feature = "otel")]
input_kind,
}
}
fn build_turn_summary(
ctx: &TurnContext,
provenance: &AuditProvenance,
turn_options: &TurnOptions,
turn_usage: TokenUsage,
) -> TurnSummary {
build_turn_summary_from_parts(TurnSummaryParts {
thread_id: &ctx.thread_id,
turn: ctx.turn,
turn_usage,
total_usage: &ctx.total_usage,
provenance,
response_id: ctx.response_id.as_deref(),
stop_reason: ctx.stop_reason,
tool_call_count: ctx.tool_call_count,
start_time: ctx.start_time,
turn_options,
})
}
struct TurnSummaryParts<'a> {
thread_id: &'a ThreadId,
turn: usize,
turn_usage: TokenUsage,
total_usage: &'a TokenUsage,
provenance: &'a AuditProvenance,
response_id: Option<&'a str>,
stop_reason: Option<agent_sdk_foundation::llm::StopReason>,
tool_call_count: usize,
start_time: Instant,
turn_options: &'a TurnOptions,
}
fn build_turn_summary_from_parts(parts: TurnSummaryParts<'_>) -> TurnSummary {
TurnSummary {
thread_id: parts.thread_id.clone(),
turn: parts.turn,
total_turns: turns_to_u32(parts.turn),
turn_usage: parts.turn_usage,
total_usage: parts.total_usage.clone(),
provenance: parts.provenance.clone(),
response_id: parts.response_id.map(str::to_string),
stop_reason: parts.stop_reason,
tool_call_count: parts.tool_call_count,
duration_ms: duration_ms_saturating(parts.start_time.elapsed()),
tool_runtime: parts.turn_options.tool_runtime.clone(),
strict_durability: parts.turn_options.strict_durability,
}
}
fn duration_ms_saturating(duration: std::time::Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
fn empty_turn_summary(
thread_id: &ThreadId,
turn: usize,
provenance: &AuditProvenance,
turn_options: &TurnOptions,
) -> TurnSummary {
TurnSummary {
thread_id: thread_id.clone(),
turn,
total_turns: 0,
turn_usage: TokenUsage::default(),
total_usage: TokenUsage::default(),
provenance: provenance.clone(),
response_id: None,
stop_reason: None,
tool_call_count: 0,
duration_ms: 0,
tool_runtime: turn_options.tool_runtime.clone(),
strict_durability: turn_options.strict_durability,
}
}
fn cancelled_turn_outcome(
thread_id: &ThreadId,
turn: usize,
provenance: &AuditProvenance,
turn_options: &TurnOptions,
) -> TurnOutcome {
TurnOutcome::Cancelled {
total_turns: 0,
total_usage: TokenUsage::default(),
summary: empty_turn_summary(thread_id, turn, provenance, turn_options),
}
}
async fn precheck_single_turn_cancelled<H>(
event_store: &Arc<dyn EventStore>,
thread_id: &ThreadId,
hooks: &Arc<H>,
authority: &Arc<dyn EventAuthority>,
provenance: &AuditProvenance,
turn_options: &TurnOptions,
) -> TurnOutcome
where
H: AgentHooks,
{
log::info!("Agent turn cancelled before execution started");
#[cfg(feature = "otel")]
crate::observability::instrument::record_root_event(
"agent.cancelled",
vec![crate::observability::attrs::kv(
crate::observability::attrs::SDK_CANCEL_REASON,
"cancel_token",
)],
);
let _ = send_event(
event_store,
thread_id,
0,
hooks,
authority,
AgentEvent::cancelled(0, TokenUsage::default()),
)
.await;
cancelled_turn_outcome(thread_id, 0, provenance, turn_options)
}
const fn turn_outcome_keeps_turn_open(outcome: &TurnOutcome) -> bool {
matches!(outcome, TurnOutcome::AwaitingConfirmation { .. })
}
fn done_run_state(ctx: &TurnContext) -> AgentRunState {
AgentRunState::Done {
total_turns: turns_to_u32(ctx.turn),
total_usage: ctx.total_usage.clone(),
}
}
fn cancelled_run_state(ctx: &TurnContext) -> AgentRunState {
AgentRunState::Cancelled {
total_turns: turns_to_u32(ctx.turn),
total_usage: ctx.total_usage.clone(),
}
}
async fn emit_cancelled_event<H>(
ctx: &TurnContext,
event_store: &Arc<dyn EventStore>,
hooks: &Arc<H>,
authority: &Arc<dyn EventAuthority>,
event_turn: usize,
) -> Result<(), AgentError>
where
H: AgentHooks,
{
send_event(
event_store,
&ctx.thread_id,
event_turn,
hooks,
authority,
AgentEvent::cancelled(ctx.turn, ctx.total_usage.clone()),
)
.await
}
fn refusal_run_state(ctx: &TurnContext) -> AgentRunState {
AgentRunState::Refusal {
total_turns: turns_to_u32(ctx.turn),
total_usage: ctx.total_usage.clone(),
}
}
async fn emit_persistent_turn_complete<H>(
ctx: &TurnContext,
event_store: &Arc<dyn EventStore>,
hooks: &Arc<H>,
authority: &Arc<dyn EventAuthority>,
current_turn: usize,
) -> Result<(), AgentRunState>
where
H: AgentHooks,
{
if let Err(error) = send_event(
event_store,
&ctx.thread_id,
current_turn,
hooks,
authority,
AgentEvent::TurnComplete {
turn: ctx.turn,
usage: ctx.total_usage.clone(),
},
)
.await
{
return Err(AgentRunState::Error(error));
}
if let Err(error) = finish_turn_or_error(event_store, &ctx.thread_id, current_turn).await {
return Err(AgentRunState::Error(error));
}
Ok(())
}
async fn handle_persistent_done<M, H>(
PersistentDoneParams {
ctx,
rx,
message_store,
event_store,
hooks,
authority,
current_turn,
cancel_token,
}: PersistentDoneParams<'_, H, M>,
) -> Option<AgentRunState>
where
M: MessageStore,
H: AgentHooks,
{
if let Err(state) =
emit_persistent_turn_complete(ctx, event_store, hooks, authority, current_turn).await
{
return Some(state);
}
tokio::select! {
msg = rx.recv() => {
match msg {
Some(AgentInput::Text(text)) => {
let user_msg = Message::user(&text);
if let Err(error) = message_store.append(&ctx.thread_id, user_msg).await {
warn!("Failed to append injected message: {error}");
return Some(done_run_state(ctx));
}
None
}
Some(AgentInput::Message(blocks)) => {
let user_msg = Message::user_with_content(blocks);
if let Err(error) = message_store.append(&ctx.thread_id, user_msg).await {
warn!("Failed to append injected message: {error}");
return Some(done_run_state(ctx));
}
None
}
_ => Some(done_run_state(ctx)),
}
}
() = cancel_token.cancelled() => {
#[cfg(feature = "otel")]
crate::observability::instrument::record_root_event(
"agent.cancelled",
vec![
crate::observability::attrs::kv(
crate::observability::attrs::SDK_CANCEL_REASON,
"cancel_token",
),
crate::observability::attrs::kv_i64(
crate::observability::attrs::SDK_TURN_NUMBER,
i64::try_from(ctx.turn).unwrap_or(0),
),
],
);
let event_turn = current_turn.saturating_add(1);
if let Err(error) =
emit_cancelled_event(ctx, event_store, hooks, authority, event_turn).await
{
return Some(AgentRunState::Error(error));
}
let _ = finish_turn_or_error(event_store, &ctx.thread_id, event_turn).await;
Some(cancelled_run_state(ctx))
}
}
}
async fn finish_turn_or_run_state(
event_store: &Arc<dyn EventStore>,
thread_id: &ThreadId,
turn: usize,
) -> Result<(), RunLoopTurnAction> {
finish_turn_or_error(event_store, thread_id, turn)
.await
.map_err(|error| RunLoopTurnAction::Return(AgentRunState::Error(error)))
}
async fn handle_run_loop_turn_result<H, M, S>(
RunLoopTurnResultParams {
result,
ctx,
input_rx,
message_store,
state_store,
event_store,
hooks,
authority,
cancel_token,
current_turn,
}: RunLoopTurnResultParams<'_, H, M, S>,
) -> RunLoopTurnAction
where
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
match result {
InternalTurnResult::Continue { .. } => {
if let Err(error) = state_store.save(&ctx.state).await {
warn!("Failed to save state checkpoint: {error}");
}
finish_turn_or_run_state(event_store, &ctx.thread_id, current_turn)
.await
.map_or_else(std::convert::identity, |()| RunLoopTurnAction::Continue)
}
InternalTurnResult::Done => {
if let Some(rx) = input_rx {
handle_persistent_done(super::types::PersistentDoneParams {
ctx,
rx,
message_store,
event_store,
hooks,
authority,
current_turn,
cancel_token,
})
.await
.map_or(RunLoopTurnAction::Continue, RunLoopTurnAction::Return)
} else {
RunLoopTurnAction::FinishRun
}
}
InternalTurnResult::Refusal => {
finish_turn_or_run_state(event_store, &ctx.thread_id, current_turn)
.await
.map_or_else(std::convert::identity, |()| {
RunLoopTurnAction::Return(refusal_run_state(ctx))
})
}
InternalTurnResult::Cancelled { .. } => {
if let Err(error) =
emit_cancelled_event(ctx, event_store, hooks, authority, current_turn).await
{
return RunLoopTurnAction::Return(AgentRunState::Error(error));
}
finish_turn_or_run_state(event_store, &ctx.thread_id, current_turn)
.await
.map_or_else(std::convert::identity, |()| {
RunLoopTurnAction::Return(cancelled_run_state(ctx))
})
}
InternalTurnResult::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation,
} => RunLoopTurnAction::Return(AgentRunState::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation: Box::new(ContinuationEnvelope::wrap(*continuation)),
}),
InternalTurnResult::PendingToolCalls { .. } => finish_turn_or_run_state(
event_store,
&ctx.thread_id,
current_turn,
)
.await
.map_or_else(std::convert::identity, |()| {
RunLoopTurnAction::Return(AgentRunState::Error(crate::types::AgentError::new(
"PendingToolCalls returned in looping mode (expected inline tool execution)",
false,
)))
}),
InternalTurnResult::Error(error) => {
finish_turn_or_run_state(event_store, &ctx.thread_id, current_turn)
.await
.map_or_else(std::convert::identity, |()| {
RunLoopTurnAction::Return(AgentRunState::Error(error))
})
}
}
}
async fn finish_run_loop_success<H, S>(
ctx: TurnContext,
state_store: &Arc<S>,
event_store: &Arc<dyn EventStore>,
hooks: &Arc<H>,
authority: &Arc<dyn EventAuthority>,
) -> AgentRunState
where
H: AgentHooks,
S: StateStore,
{
if let Err(error) = state_store.save(&ctx.state).await {
warn!("Failed to save final state: {error}");
}
let duration = ctx.start_time.elapsed();
if let Err(error) = send_event(
event_store,
&ctx.thread_id,
ctx.turn,
hooks,
authority,
AgentEvent::done(
ctx.thread_id.clone(),
ctx.turn,
ctx.total_usage.clone(),
duration,
),
)
.await
{
return AgentRunState::Error(error);
}
if let Err(error) = finish_turn_or_error(event_store, &ctx.thread_id, ctx.turn).await {
return AgentRunState::Error(error);
}
AgentRunState::Done {
total_turns: turns_to_u32(ctx.turn),
total_usage: ctx.total_usage.clone(),
}
}
pub(super) async fn run_loop_turns<Ctx, P, H, M, S>(
RunLoopTurnsParams {
ctx,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
event_store,
authority,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
provenance,
cancel_token,
mut input_rx,
turn_options,
#[cfg(feature = "otel")]
observability_store,
}: RunLoopTurnsParams<'_, Ctx, P, H, M, S>,
) -> Option<AgentRunState>
where
Ctx: Send + Sync + Clone + 'static,
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
loop {
if cancel_token.is_cancelled() {
log::info!("Agent run cancelled before turn {}", ctx.turn);
#[cfg(feature = "otel")]
crate::observability::instrument::record_root_event(
"agent.cancelled",
vec![
crate::observability::attrs::kv(
crate::observability::attrs::SDK_CANCEL_REASON,
"cancel_token",
),
crate::observability::attrs::kv_i64(
crate::observability::attrs::SDK_TURN_NUMBER,
i64::try_from(ctx.turn).unwrap_or(0),
),
],
);
let event_turn = ctx.turn.saturating_add(1);
if let Err(error) =
emit_cancelled_event(ctx, event_store, hooks, authority, event_turn).await
{
return Some(AgentRunState::Error(error));
}
let _ = finish_turn_or_error(event_store, &ctx.thread_id, event_turn).await;
return Some(cancelled_run_state(ctx));
}
let current_turn = ctx.turn.saturating_add(1);
let turn_tool_context = tool_context.clone().with_event_store(
Arc::clone(event_store),
ctx.thread_id.clone(),
current_turn,
Arc::clone(authority),
);
let result = execute_turn(ExecuteTurnParameters {
event_store,
authority,
ctx,
tool_context: &turn_tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
provenance,
turn_options,
cancel_token,
#[cfg(feature = "otel")]
observability_store,
})
.await;
match handle_run_loop_turn_result(super::types::RunLoopTurnResultParams {
result,
ctx,
input_rx: input_rx.as_deref_mut(),
message_store,
state_store,
event_store,
hooks,
authority,
cancel_token,
current_turn,
})
.await
{
RunLoopTurnAction::Continue => {}
RunLoopTurnAction::FinishRun => return None,
RunLoopTurnAction::Return(state) => return Some(state),
}
}
}
pub(super) async fn handle_single_turn_resume<Ctx, H, M, S>(
SingleTurnResumeParams {
resume_data,
turn,
total_usage,
state,
thread_id,
tool_context,
tools,
hooks,
event_store,
authority,
message_store,
state_store,
execution_store,
audit_sink,
provenance,
turn_options,
start_time,
}: SingleTurnResumeParams<Ctx, H, M, S>,
) -> TurnOutcome
where
Ctx: Send + Sync + Clone + 'static,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
let resume_result = process_resume(ResumeProcessingParameters {
resume_data,
turn,
total_usage: &total_usage,
state: &state,
thread_id: &thread_id,
tool_context: &tool_context,
tools: &tools,
hooks: &hooks,
event_store: &event_store,
authority: &authority,
message_store: &message_store,
execution_store: execution_store.as_ref(),
audit_sink: &audit_sink,
provenance: &provenance,
})
.await;
match resume_result {
Ok(ResumeProcessingResult::Completed {
turn_usage,
metrics,
}) => {
let mut updated_state = state;
updated_state.turn_count = turn;
if let Err(error) = state_store.save(&updated_state).await {
warn!("Failed to save state checkpoint: {error}");
}
let summary = build_turn_summary_from_parts(TurnSummaryParts {
thread_id: &thread_id,
turn,
turn_usage: turn_usage.clone(),
total_usage: &total_usage,
provenance: &provenance,
response_id: metrics.response_id.as_deref(),
stop_reason: metrics.stop_reason,
tool_call_count: metrics.tool_call_count,
start_time,
turn_options: &turn_options,
});
TurnOutcome::NeedsMoreTurns {
turn,
turn_usage,
total_usage,
summary,
}
}
Ok(ResumeProcessingResult::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation,
}) => {
let turn_usage = continuation.turn_usage.clone();
let summary = build_turn_summary_from_parts(TurnSummaryParts {
thread_id: &thread_id,
turn,
turn_usage,
total_usage: &total_usage,
provenance: &provenance,
response_id: continuation.response_id.as_deref(),
stop_reason: continuation.stop_reason,
tool_call_count: continuation.pending_tool_calls.len(),
start_time,
turn_options: &turn_options,
});
TurnOutcome::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation: Box::new(ContinuationEnvelope::wrap(*continuation)),
summary,
}
}
Err(error) => {
if let Err(store_error) = send_event(
&event_store,
&thread_id,
turn,
&hooks,
&authority,
AgentEvent::error(&error.message, error.recoverable),
)
.await
{
return TurnOutcome::Error(store_error);
}
TurnOutcome::Error(error)
}
}
}
fn has_orphaned_tool_use(messages: &[Message]) -> bool {
let Some(last) = messages.last() else {
return false;
};
if last.role != Role::Assistant {
return false;
}
let Content::Blocks(blocks) = &last.content else {
return false;
};
blocks
.iter()
.any(|b| matches!(b, ContentBlock::ToolUse { .. }))
}
fn synthesize_error_tool_results(messages: &[Message]) -> Option<Message> {
let last = messages.last()?;
let Content::Blocks(blocks) = &last.content else {
return None;
};
let result_blocks: Vec<ContentBlock> = blocks
.iter()
.filter_map(|b| {
if let ContentBlock::ToolUse { id, .. } = b {
Some(ContentBlock::ToolResult {
tool_use_id: id.clone(),
content: "Tool execution was interrupted by a crash. Please retry.".to_string(),
is_error: Some(true),
})
} else {
None
}
})
.collect();
if result_blocks.is_empty() {
return None;
}
Some(Message {
role: Role::User,
content: Content::Blocks(result_blocks),
})
}
async fn recover_orphaned_tool_use<M>(
thread_id: &ThreadId,
message_store: &Arc<M>,
) -> Result<(), AgentError>
where
M: MessageStore,
{
let history = message_store
.get_history(thread_id)
.await
.map_err(|e| AgentError::new(format!("Failed to get history for recovery: {e}"), false))?;
if has_orphaned_tool_use(&history) {
warn!("Detected orphaned tool_use blocks — synthesizing error results for crash recovery");
if let Some(recovery_msg) = synthesize_error_tool_results(&history) {
message_store
.append(thread_id, recovery_msg)
.await
.map_err(|e| {
AgentError::new(
format!("Failed to append recovery tool results: {e}"),
false,
)
})?;
}
}
Ok(())
}
pub(super) async fn run_loop<Ctx, P, H, M, S>(
params: RunLoopParameters<Ctx, P, H, M, S>,
) -> AgentRunState
where
Ctx: Send + Sync + Clone + 'static,
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
#[cfg(feature = "otel")]
let started = crate::observability::instrument::start_root_span(
&crate::observability::instrument::StartRootSpanParams {
provider: params.provider.as_ref(),
tools: ¶ms.tools,
config: ¶ms.config,
thread_id: ¶ms.thread_id,
input: ¶ms.input,
run_mode: "loop",
run_options: ¶ms.run_options,
},
);
#[cfg(feature = "otel")]
let trace_state = crate::observability::instrument::build_root_trace_state(
started.is_recording,
¶ms.run_options,
);
#[cfg(feature = "otel")]
let root_context = {
let cx = crate::observability::instrument::build_root_context(
started.span_context.clone(),
¶ms.run_options,
);
let cx =
crate::observability::instrument::attach_root_event_sink(&cx, started.sink.clone());
match trace_state.clone() {
Some(state) => state.attach_to(&cx),
None => cx,
}
};
#[cfg(feature = "otel")]
let result = {
use opentelemetry::trace::FutureExt;
run_loop_inner(params).with_context(root_context).await
};
#[cfg(not(feature = "otel"))]
let result = run_loop_inner(params).await;
#[cfg(feature = "otel")]
{
use crate::observability::instrument::{
end_root_span, flush_root_trace_state, run_state_outcome,
};
let (turns, total_usage) = match &result {
AgentRunState::Done {
total_turns,
total_usage,
}
| AgentRunState::Refusal {
total_turns,
total_usage,
} => (usize::try_from(*total_turns).unwrap_or(0), total_usage),
_ => {
static EMPTY: TokenUsage = TokenUsage {
input_tokens: 0,
output_tokens: 0,
cached_input_tokens: 0,
cache_creation_input_tokens: 0,
};
(0, &EMPTY)
}
};
if let Some(state) = trace_state {
flush_root_trace_state(&started.sink, state.as_ref());
}
end_root_span(started.sink, turns, total_usage, run_state_outcome(&result));
}
result
}
async fn run_loop_inner<Ctx, P, H, M, S>(
RunLoopParameters {
event_store,
authority,
thread_id,
input,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
cancel_token,
mut input_rx,
#[cfg(feature = "otel")]
run_options: _,
#[cfg(feature = "otel")]
observability_store,
}: RunLoopParameters<Ctx, P, H, M, S>,
) -> AgentRunState
where
Ctx: Send + Sync + Clone + 'static,
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
let tool_context =
apply_tool_boundary_controls(tool_context, &cancel_token, config.tool_timeout_ms);
let provenance =
agent_sdk_foundation::audit::AuditProvenance::new(provider.provider(), provider.model());
let start_time = Instant::now();
#[cfg(feature = "otel")]
let input_kind = crate::observability::attrs::input_kind_str(&input);
let init_state = match initialize_run_loop_state(
input,
&thread_id,
&message_store,
&state_store,
execution_store.as_ref(),
&audit_sink,
&provenance,
)
.await
{
Ok(state) => state,
Err(error) => return error,
};
let InitializedState {
turn,
total_usage,
state,
resume_data,
} = init_state;
if let Some(resume_data) = resume_data {
let resume_tool_context = tool_context.clone().with_event_store(
Arc::clone(&event_store),
thread_id.clone(),
turn,
Arc::clone(&authority),
);
if let Some(outcome) = handle_run_loop_resume_state(RunLoopResumeParams {
resume_data,
turn,
total_usage: &total_usage,
state: &state,
thread_id: &thread_id,
tool_context: &resume_tool_context,
tools: &tools,
hooks: &hooks,
event_store: &event_store,
authority: &authority,
message_store: &message_store,
execution_store: execution_store.as_ref(),
audit_sink: &audit_sink,
provenance: &provenance,
})
.await
{
return outcome;
}
}
let mut ctx = build_turn_context(
&thread_id,
turn,
total_usage,
state,
start_time,
#[cfg(feature = "otel")]
input_kind,
);
let default_turn_options = TurnOptions::default();
if let Some(outcome) = run_loop_turns(RunLoopTurnsParams {
ctx: &mut ctx,
tool_context: &tool_context,
provider: &provider,
tools: &tools,
hooks: &hooks,
message_store: &message_store,
state_store: &state_store,
event_store: &event_store,
authority: &authority,
config: &config,
compaction_config: compaction_config.as_ref(),
compactor: compactor.as_ref(),
execution_store: execution_store.as_ref(),
audit_sink: &audit_sink,
provenance: &provenance,
cancel_token: &cancel_token,
input_rx: input_rx.as_mut(),
turn_options: &default_turn_options,
#[cfg(feature = "otel")]
observability_store: observability_store.as_ref(),
})
.await
{
return outcome;
}
finish_run_loop_success(ctx, &state_store, &event_store, &hooks, &authority).await
}
pub(super) async fn run_single_turn<Ctx, P, H, M, S>(
params: TurnParameters<Ctx, P, H, M, S>,
) -> TurnOutcome
where
Ctx: Send + Sync + Clone + 'static,
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
#[cfg(feature = "otel")]
let started = crate::observability::instrument::start_root_span(
&crate::observability::instrument::StartRootSpanParams {
provider: params.provider.as_ref(),
tools: ¶ms.tools,
config: ¶ms.config,
thread_id: ¶ms.thread_id,
input: ¶ms.input,
run_mode: "single_turn",
run_options: ¶ms.run_options,
},
);
#[cfg(feature = "otel")]
let trace_state = crate::observability::instrument::build_root_trace_state(
started.is_recording,
¶ms.run_options,
);
#[cfg(feature = "otel")]
let root_context = {
let cx = crate::observability::instrument::build_root_context(
started.span_context.clone(),
¶ms.run_options,
);
let cx =
crate::observability::instrument::attach_root_event_sink(&cx, started.sink.clone());
match trace_state.clone() {
Some(state) => state.attach_to(&cx),
None => cx,
}
};
#[cfg(feature = "otel")]
let outcome = {
use opentelemetry::trace::FutureExt;
run_single_turn_inner(params)
.with_context(root_context)
.await
};
#[cfg(not(feature = "otel"))]
let outcome = run_single_turn_inner(params).await;
#[cfg(feature = "otel")]
{
use crate::observability::instrument::{
end_root_span, flush_root_trace_state, turn_outcome_str,
};
let (turns, total_usage) = match &outcome {
TurnOutcome::Done {
total_turns,
total_usage,
..
}
| TurnOutcome::Refusal {
total_turns,
total_usage,
..
}
| TurnOutcome::Cancelled {
total_turns,
total_usage,
..
} => (usize::try_from(*total_turns).unwrap_or(0), total_usage),
TurnOutcome::NeedsMoreTurns {
turn, total_usage, ..
} => (*turn, total_usage),
_ => {
static EMPTY: TokenUsage = TokenUsage {
input_tokens: 0,
output_tokens: 0,
cached_input_tokens: 0,
cache_creation_input_tokens: 0,
};
(0, &EMPTY)
}
};
if let Some(state) = trace_state {
flush_root_trace_state(&started.sink, state.as_ref());
}
end_root_span(started.sink, turns, total_usage, turn_outcome_str(&outcome));
}
outcome
}
async fn run_single_turn_inner<Ctx, P, H, M, S>(
TurnParameters {
event_store,
authority,
thread_id,
input,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
cancel_token,
turn_options,
#[cfg(feature = "otel")]
run_options: _,
#[cfg(feature = "otel")]
observability_store,
}: TurnParameters<Ctx, P, H, M, S>,
) -> TurnOutcome
where
Ctx: Send + Sync + Clone + 'static,
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
let provenance =
agent_sdk_foundation::audit::AuditProvenance::new(provider.provider(), provider.model());
if cancel_token.is_cancelled() {
return precheck_single_turn_cancelled(
&event_store,
&thread_id,
&hooks,
&authority,
&provenance,
&turn_options,
)
.await;
}
let tool_context =
apply_tool_boundary_controls(tool_context, &cancel_token, config.tool_timeout_ms);
let start_time = Instant::now();
#[cfg(feature = "otel")]
let input_kind = crate::observability::attrs::input_kind_str(&input);
let init_state = match initialize_single_turn_state(
input,
&thread_id,
&message_store,
&state_store,
execution_store.as_ref(),
&audit_sink,
&provenance,
)
.await
{
Ok(state) => state,
Err(outcome) => return outcome,
};
let InitializedState {
turn,
total_usage,
state,
resume_data,
} = init_state;
if let Some(resume_data) = resume_data {
let resume_tool_context = tool_context.clone().with_event_store(
Arc::clone(&event_store),
thread_id.clone(),
turn,
Arc::clone(&authority),
);
return handle_single_turn_resume_state(SingleTurnResumeParams {
resume_data,
turn,
total_usage,
state,
thread_id: thread_id.clone(),
tool_context: resume_tool_context,
tools,
hooks,
event_store: Arc::clone(&event_store),
authority,
message_store,
state_store,
execution_store,
audit_sink,
provenance,
turn_options: turn_options.clone(),
start_time,
})
.await;
}
run_single_turn_execute(SingleTurnExecuteParams {
event_store,
authority,
thread_id,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
provenance,
turn_options,
cancel_token,
turn,
total_usage,
state,
start_time,
#[cfg(feature = "otel")]
input_kind,
#[cfg(feature = "otel")]
observability_store,
})
.await
}
struct SingleTurnExecuteParams<Ctx, P, H, M, S> {
event_store: Arc<dyn EventStore>,
authority: Arc<dyn EventAuthority>,
thread_id: ThreadId,
tool_context: crate::tools::ToolContext<Ctx>,
provider: Arc<P>,
tools: Arc<crate::tools::ToolRegistry<Ctx>>,
hooks: Arc<H>,
message_store: Arc<M>,
state_store: Arc<S>,
config: AgentConfig,
compaction_config: Option<CompactionConfig>,
compactor: Option<Arc<dyn ContextCompactor>>,
execution_store: Option<Arc<dyn ToolExecutionStore>>,
audit_sink: Arc<dyn crate::hooks::ToolAuditSink>,
provenance: agent_sdk_foundation::audit::AuditProvenance,
turn_options: TurnOptions,
cancel_token: CancellationToken,
turn: usize,
total_usage: TokenUsage,
state: AgentState,
start_time: Instant,
#[cfg(feature = "otel")]
input_kind: &'static str,
#[cfg(feature = "otel")]
observability_store: Option<Arc<dyn crate::observability::ObservabilityStore>>,
}
async fn run_single_turn_execute<Ctx, P, H, M, S>(
SingleTurnExecuteParams {
event_store,
authority,
thread_id,
tool_context,
provider,
tools,
hooks,
message_store,
state_store,
config,
compaction_config,
compactor,
execution_store,
audit_sink,
provenance,
turn_options,
cancel_token,
turn,
total_usage,
state,
start_time,
#[cfg(feature = "otel")]
input_kind,
#[cfg(feature = "otel")]
observability_store,
}: SingleTurnExecuteParams<Ctx, P, H, M, S>,
) -> TurnOutcome
where
Ctx: Send + Sync + Clone + 'static,
P: LlmProvider,
H: AgentHooks,
M: MessageStore,
S: StateStore,
{
let mut ctx = build_turn_context(
&thread_id,
turn,
total_usage,
state,
start_time,
#[cfg(feature = "otel")]
input_kind,
);
let current_turn = ctx.turn.saturating_add(1);
let turn_tool_context = tool_context.clone().with_event_store(
Arc::clone(&event_store),
thread_id.clone(),
current_turn,
Arc::clone(&authority),
);
let result = execute_turn(ExecuteTurnParameters {
event_store: &event_store,
authority: &authority,
ctx: &mut ctx,
tool_context: &turn_tool_context,
provider: &provider,
tools: &tools,
hooks: &hooks,
message_store: &message_store,
state_store: &state_store,
config: &config,
compaction_config: compaction_config.as_ref(),
compactor: compactor.as_ref(),
execution_store: execution_store.as_ref(),
audit_sink: &audit_sink,
provenance: &provenance,
turn_options: &turn_options,
cancel_token: &cancel_token,
#[cfg(feature = "otel")]
observability_store: observability_store.as_ref(),
})
.await;
let outcome = convert_turn_result(ConvertTurnResultParams {
result,
ctx,
event_store: &event_store,
hooks: &hooks,
authority: &authority,
thread_id: thread_id.clone(),
current_turn,
state_store: &state_store,
provenance: &provenance,
turn_options: &turn_options,
})
.await;
if !turn_outcome_keeps_turn_open(&outcome)
&& let Err(store_error) = finish_turn_or_error(&event_store, &thread_id, current_turn).await
{
return TurnOutcome::Error(store_error);
}
outcome
}
async fn convert_cancelled_turn<H>(
ctx: &TurnContext,
event_store: &Arc<dyn EventStore>,
hooks: &Arc<H>,
authority: &Arc<dyn EventAuthority>,
provenance: &AuditProvenance,
turn_options: &TurnOptions,
turn_usage: TokenUsage,
) -> TurnOutcome
where
H: AgentHooks,
{
if let Err(error) = emit_cancelled_event(ctx, event_store, hooks, authority, ctx.turn).await {
return TurnOutcome::Error(error);
}
let summary = build_turn_summary(ctx, provenance, turn_options, turn_usage);
TurnOutcome::Cancelled {
total_turns: turns_to_u32(ctx.turn),
total_usage: ctx.total_usage.clone(),
summary,
}
}
struct ConvertDoneParams<'a, H, S> {
ctx: &'a TurnContext,
state_store: &'a Arc<S>,
event_store: &'a Arc<dyn EventStore>,
hooks: &'a Arc<H>,
authority: &'a Arc<dyn EventAuthority>,
thread_id: &'a ThreadId,
current_turn: usize,
provenance: &'a AuditProvenance,
turn_options: &'a TurnOptions,
}
async fn convert_done_turn<H, S>(params: ConvertDoneParams<'_, H, S>) -> TurnOutcome
where
H: AgentHooks,
S: StateStore,
{
let ConvertDoneParams {
ctx,
state_store,
event_store,
hooks,
authority,
thread_id,
current_turn,
provenance,
turn_options,
} = params;
if let Err(e) = state_store.save(&ctx.state).await {
warn!("Failed to save final state: {e}");
}
let duration = ctx.start_time.elapsed();
if let Err(error) = send_event(
event_store,
thread_id,
current_turn,
hooks,
authority,
AgentEvent::done(
thread_id.clone(),
ctx.turn,
ctx.total_usage.clone(),
duration,
),
)
.await
{
return TurnOutcome::Error(error);
}
let summary = build_turn_summary(ctx, provenance, turn_options, ctx.total_usage.clone());
TurnOutcome::Done {
total_turns: turns_to_u32(ctx.turn),
total_usage: ctx.total_usage.clone(),
summary,
}
}
pub(super) async fn convert_turn_result<H: AgentHooks, S: StateStore>(
ConvertTurnResultParams {
result,
ctx,
event_store,
hooks,
authority,
thread_id,
current_turn,
state_store,
provenance,
turn_options,
}: ConvertTurnResultParams<'_, H, S>,
) -> TurnOutcome {
match result {
InternalTurnResult::Continue { turn_usage } => {
if let Err(e) = state_store.save(&ctx.state).await {
warn!("Failed to save state checkpoint: {e}");
}
let summary = build_turn_summary(&ctx, provenance, turn_options, turn_usage.clone());
TurnOutcome::NeedsMoreTurns {
turn: ctx.turn,
turn_usage,
total_usage: ctx.total_usage,
summary,
}
}
InternalTurnResult::Done => {
convert_done_turn(ConvertDoneParams {
ctx: &ctx,
state_store,
event_store,
hooks,
authority,
thread_id: &thread_id,
current_turn,
provenance,
turn_options,
})
.await
}
InternalTurnResult::Refusal => {
let summary =
build_turn_summary(&ctx, provenance, turn_options, ctx.total_usage.clone());
TurnOutcome::Refusal {
total_turns: turns_to_u32(ctx.turn),
total_usage: ctx.total_usage.clone(),
summary,
}
}
InternalTurnResult::Cancelled { turn_usage } => {
convert_cancelled_turn(
&ctx,
event_store,
hooks,
authority,
provenance,
turn_options,
turn_usage,
)
.await
}
InternalTurnResult::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation,
} => {
let turn_usage = continuation.turn_usage.clone();
let summary = build_turn_summary(&ctx, provenance, turn_options, turn_usage);
TurnOutcome::AwaitingConfirmation {
tool_call_id,
tool_name,
display_name,
input,
description,
continuation: Box::new(ContinuationEnvelope::wrap(*continuation)),
summary,
}
}
InternalTurnResult::PendingToolCalls {
turn_usage,
pending_tool_calls,
continuation,
} => {
let summary = build_turn_summary(&ctx, provenance, turn_options, turn_usage.clone());
TurnOutcome::PendingToolCalls {
turn: ctx.turn,
turn_usage,
total_usage: ctx.total_usage,
tool_calls: pending_tool_calls,
continuation: Box::new(ContinuationEnvelope::wrap(*continuation)),
summary,
}
}
InternalTurnResult::Error(e) => TurnOutcome::Error(e),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_has_orphaned_tool_use_empty_history() {
assert!(!has_orphaned_tool_use(&[]));
}
#[test]
fn test_has_orphaned_tool_use_user_last() {
let messages = vec![Message::user("hello")];
assert!(!has_orphaned_tool_use(&messages));
}
#[test]
fn test_has_orphaned_tool_use_assistant_text_only() {
let messages = vec![Message::assistant("Sure, I can help.")];
assert!(!has_orphaned_tool_use(&messages));
}
#[test]
fn test_has_orphaned_tool_use_assistant_with_tool_use() {
let messages = vec![Message {
role: Role::Assistant,
content: Content::Blocks(vec![ContentBlock::ToolUse {
id: "tool_1".to_string(),
name: "read".to_string(),
input: serde_json::json!({"path": "/test"}),
thought_signature: None,
}]),
}];
assert!(has_orphaned_tool_use(&messages));
}
#[test]
fn test_synthesize_error_tool_results() {
let messages = vec![Message {
role: Role::Assistant,
content: Content::Blocks(vec![
ContentBlock::Text {
text: "Let me read that.".to_string(),
},
ContentBlock::ToolUse {
id: "tool_1".to_string(),
name: "read".to_string(),
input: serde_json::json!({"path": "/test"}),
thought_signature: None,
},
ContentBlock::ToolUse {
id: "tool_2".to_string(),
name: "grep".to_string(),
input: serde_json::json!({"pattern": "foo"}),
thought_signature: None,
},
]),
}];
let recovery = synthesize_error_tool_results(&messages);
assert!(recovery.is_some());
let msg = recovery.unwrap();
assert_eq!(msg.role, Role::User);
let Content::Blocks(blocks) = &msg.content else {
panic!("Expected Blocks");
};
assert_eq!(blocks.len(), 2);
for block in blocks {
let ContentBlock::ToolResult { is_error, .. } = block else {
panic!("Expected ToolResult");
};
assert_eq!(*is_error, Some(true));
}
}
#[test]
fn test_validate_external_tool_results_ok() {
use crate::types::{
AgentContinuation, AgentState, ExternalToolResult, PendingToolCallInfo, TokenUsage,
ToolResult,
};
let thread = ThreadId::new();
let cont = AgentContinuation {
thread_id: thread.clone(),
turn: 1,
total_usage: TokenUsage::default(),
turn_usage: TokenUsage::default(),
pending_tool_calls: vec![PendingToolCallInfo {
id: "call_1".into(),
name: "echo".into(),
display_name: "Echo".into(),
tier: crate::types::ToolTier::Observe,
input: serde_json::json!({}),
effective_input: serde_json::json!({}),
listen_context: None,
}],
awaiting_index: 0,
completed_results: Vec::new(),
state: AgentState::new(thread),
response_id: None,
stop_reason: None,
response_content: Vec::new(),
};
let results = vec![ExternalToolResult {
tool_call_id: "call_1".into(),
result: ToolResult::success("ok"),
}];
assert!(validate_external_tool_results(&cont, &results).is_ok());
}
#[test]
fn test_validate_external_tool_results_missing() {
use crate::types::{AgentContinuation, AgentState, PendingToolCallInfo, TokenUsage};
let thread = ThreadId::new();
let cont = AgentContinuation {
thread_id: thread.clone(),
turn: 1,
total_usage: TokenUsage::default(),
turn_usage: TokenUsage::default(),
pending_tool_calls: vec![
PendingToolCallInfo {
id: "call_1".into(),
name: "echo".into(),
display_name: "Echo".into(),
tier: crate::types::ToolTier::Observe,
input: serde_json::json!({}),
effective_input: serde_json::json!({}),
listen_context: None,
},
PendingToolCallInfo {
id: "call_2".into(),
name: "write".into(),
display_name: "Write".into(),
tier: crate::types::ToolTier::Confirm,
input: serde_json::json!({}),
effective_input: serde_json::json!({}),
listen_context: None,
},
],
awaiting_index: 0,
completed_results: Vec::new(),
state: AgentState::new(thread),
response_id: None,
stop_reason: None,
response_content: Vec::new(),
};
let results = vec![crate::types::ExternalToolResult {
tool_call_id: "call_1".into(),
result: crate::types::ToolResult::success("ok"),
}];
let err = validate_external_tool_results(&cont, &results);
assert!(err.is_err());
let msg = err.unwrap_err().to_string();
assert!(
msg.contains("call_2"),
"Error should mention missing call_2: {msg}"
);
}
#[test]
fn test_validate_external_tool_results_unknown_id() {
use crate::types::{
AgentContinuation, AgentState, ExternalToolResult, PendingToolCallInfo, TokenUsage,
ToolResult,
};
let thread = ThreadId::new();
let cont = AgentContinuation {
thread_id: thread.clone(),
turn: 1,
total_usage: TokenUsage::default(),
turn_usage: TokenUsage::default(),
pending_tool_calls: vec![PendingToolCallInfo {
id: "call_1".into(),
name: "echo".into(),
display_name: "Echo".into(),
tier: crate::types::ToolTier::Observe,
input: serde_json::json!({}),
effective_input: serde_json::json!({}),
listen_context: None,
}],
awaiting_index: 0,
completed_results: Vec::new(),
state: AgentState::new(thread),
response_id: None,
stop_reason: None,
response_content: Vec::new(),
};
let results = vec![
ExternalToolResult {
tool_call_id: "call_1".into(),
result: ToolResult::success("ok"),
},
ExternalToolResult {
tool_call_id: "bogus_id".into(),
result: ToolResult::success("extra"),
},
];
let err = validate_external_tool_results(&cont, &results);
assert!(err.is_err());
let msg = err.unwrap_err().to_string();
assert!(
msg.contains("bogus_id"),
"Error should mention bogus_id: {msg}"
);
}
#[test]
fn test_validate_external_tool_results_empty_continuation() {
use crate::types::{AgentContinuation, AgentState, TokenUsage};
let thread = ThreadId::new();
let cont = AgentContinuation {
thread_id: thread.clone(),
turn: 1,
total_usage: TokenUsage::default(),
turn_usage: TokenUsage::default(),
pending_tool_calls: Vec::new(),
awaiting_index: 0,
completed_results: Vec::new(),
state: AgentState::new(thread),
response_id: None,
stop_reason: None,
response_content: Vec::new(),
};
let err = validate_external_tool_results(&cont, &[]);
assert!(err.is_err());
let msg = err.unwrap_err().to_string();
assert!(msg.contains("no pending tool calls"), "Error: {msg}");
}
}