use crate::budget::Budget;
use crate::error::{AgentError, ToolError};
use crate::event::AgentEvent;
use crate::hooks::{HookInvocation, HookPoint};
use crate::ops::{ToolDispatchOutcome, ToolDispatchTimeoutPolicy};
use crate::retry::RetryPolicy;
use crate::service::TurnToolOverlay;
use crate::session::{PendingSystemContextAppend, Session};
use crate::state::LoopState;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use crate::tool_scope::{
EXTERNAL_TOOL_FILTER_METADATA_KEY, ExternalToolSurfaceBaseState,
ExternalToolSurfaceDeltaOperation, ExternalToolSurfaceDeltaPhase,
ExternalToolSurfaceEntrySnapshot, ExternalToolSurfaceSnapshot, ToolFilter, ToolScopeRevision,
ToolScopeStageError,
};
use crate::turn_execution_authority::{
TurnPrimitiveKind, TurnTerminalCauseKind, TurnTerminalOutcome,
};
use crate::types::{ContentInput, Message, RunResult, ToolCallView, ToolNameSet};
use async_trait::async_trait;
use serde_json::value::to_raw_value;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::mpsc;
use super::{Agent, AgentBuilder, AgentLlmClient, AgentSessionStore, AgentToolDispatcher};
fn dispatcher_knows_tool<T>(dispatcher: &T, name: &str) -> bool
where
T: AgentToolDispatcher + ?Sized,
{
if dispatcher.tool_catalog_capabilities().exact_catalog {
dispatcher
.tool_catalog()
.iter()
.any(|entry| entry.tool.name == name)
} else {
dispatcher.tools().iter().any(|tool| tool.name == name)
}
}
fn precheck_visible_tool_call<T>(
dispatcher: &T,
visible_names: &ToolNameSet,
name: &str,
) -> Result<(), ToolError>
where
T: AgentToolDispatcher + ?Sized,
{
if visible_names.contains(name) {
return Ok(());
}
if dispatcher_knows_tool(dispatcher, name) {
return Err(ToolError::access_denied(name));
}
Err(ToolError::not_found(name))
}
fn runtime_execution_snapshot(
handle: &dyn crate::TurnStateHandle,
applied_cursor: crate::completion_feed::CompletionSeq,
) -> Option<crate::AgentExecutionSnapshot> {
let snapshot = handle.snapshot();
let turn_phase = snapshot.turn_phase;
let primitive_kind = snapshot.primitive_kind.unwrap_or(TurnPrimitiveKind::None);
let terminal_outcome = snapshot
.terminal_outcome
.unwrap_or(TurnTerminalOutcome::None);
let pending_operation_ids = if snapshot.pending_op_refs.is_empty() {
None
} else {
Some(
snapshot
.pending_op_refs
.iter()
.map(|op_ref| op_ref.operation_id.clone())
.collect(),
)
};
let barrier_operation_ids = snapshot.barrier_operation_ids.iter().cloned().collect();
Some(crate::AgentExecutionSnapshot {
loop_state: snapshot.loop_state,
turn_phase,
active_run_id: snapshot.active_run_id,
primitive_kind,
admitted_content_shape: snapshot.admitted_content_shape,
vision_enabled: snapshot.vision_enabled,
image_tool_results_enabled: snapshot.image_tool_results_enabled,
tool_calls_pending: u32::try_from(snapshot.tool_calls_pending).ok()?,
pending_operation_ids,
barrier_operation_ids,
has_barrier_ops: snapshot.has_barrier_ops,
barrier_satisfied: snapshot.barrier_satisfied,
boundary_count: u32::try_from(snapshot.boundary_count).ok()?,
cancel_after_boundary: snapshot.cancel_after_boundary,
terminal_outcome,
terminal_cause_kind: snapshot.terminal_cause_kind,
extraction_attempts: u32::try_from(snapshot.extraction_attempts).ok()?,
max_extraction_retries: u32::try_from(snapshot.max_extraction_retries).ok()?,
applied_cursor,
})
}
fn runtime_external_tool_surface_snapshot(
handle: &dyn crate::ExternalToolSurfaceHandle,
) -> Option<ExternalToolSurfaceSnapshot> {
let snapshot = handle.diagnostic_snapshot();
let phase = snapshot.surface_phase;
let visible_surfaces = snapshot.visible_surfaces;
let snapshot_epoch = snapshot.snapshot_epoch;
let snapshot_aligned_epoch = snapshot.snapshot_aligned_epoch;
let mut entries = Vec::with_capacity(snapshot.entries.len());
for entry in snapshot.entries {
entries.push(ExternalToolSurfaceEntrySnapshot {
visible: visible_surfaces.contains(&entry.surface_id),
surface_id: entry.surface_id,
base_state: entry
.base_state
.unwrap_or(ExternalToolSurfaceBaseState::Absent),
has_removal_timing: entry.removal_draining_since_ms.is_some()
|| entry.removal_timeout_at_ms.is_some()
|| entry.removal_applied_at_turn.is_some(),
pending_op: entry.pending_op,
staged_op: entry.staged_op,
staged_intent_sequence: entry.staged_intent_sequence.unwrap_or(0),
pending_task_sequence: entry.pending_task_sequence.unwrap_or(0),
pending_lineage_sequence: entry.pending_lineage_sequence.unwrap_or(0),
inflight_call_count: entry.inflight_calls,
last_delta_operation: entry
.last_delta_operation
.unwrap_or(ExternalToolSurfaceDeltaOperation::None),
last_delta_phase: entry
.last_delta_phase
.unwrap_or(ExternalToolSurfaceDeltaPhase::None),
});
}
Some(ExternalToolSurfaceSnapshot {
phase,
snapshot_epoch,
snapshot_aligned_epoch,
entries,
})
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentRunner: Send {
async fn run(&mut self, prompt: ContentInput) -> Result<RunResult, AgentError>;
async fn run_with_events(
&mut self,
prompt: ContentInput,
tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError>;
}
impl<C, T, S> Agent<C, T, S>
where
C: AgentLlmClient + ?Sized,
T: AgentToolDispatcher + ?Sized + 'static,
S: AgentSessionStore + ?Sized,
{
pub fn stage_external_tool_filter(
&mut self,
filter: ToolFilter,
) -> Result<ToolScopeRevision, ToolScopeStageError> {
let handle = self.tool_scope.handle();
let revision = handle.stage_external_filter(filter)?;
let _ = handle.staged_revision();
if let Ok(visibility_state) = self.tool_scope.visibility_state() {
if let Err(err) = self.session.set_tool_visibility_state(visibility_state) {
tracing::warn!(
error = %err,
"failed to persist staged canonical tool visibility state"
);
}
self.session
.remove_metadata(EXTERNAL_TOOL_FILTER_METADATA_KEY);
}
Ok(revision)
}
pub fn set_flow_tool_overlay(
&mut self,
overlay: Option<TurnToolOverlay>,
) -> Result<(), ToolScopeStageError> {
let handle = self.tool_scope.handle();
if let Some(overlay) = overlay {
let allow = overlay
.allowed_tools
.map(|tools| tools.into_iter().collect::<HashSet<_>>());
let deny = overlay
.blocked_tools
.unwrap_or_default()
.into_iter()
.collect::<HashSet<_>>();
handle.set_turn_overlay(allow, deny)?;
} else {
handle.clear_turn_overlay();
}
Ok(())
}
pub fn set_runtime_execution_kind(
&mut self,
execution_kind: Option<crate::lifecycle::RuntimeExecutionKind>,
) {
self.runtime_execution_kind = execution_kind;
}
fn clear_runtime_execution_kind(&mut self) {
self.runtime_execution_kind = None;
}
fn require_runtime_execution_kind(&self) -> Result<(), AgentError> {
if self.runtime_execution_kind_required && self.runtime_execution_kind.is_none() {
return Err(AgentError::InternalError(
"runtime_execution_kind not set: turn-state handle is attached but \
the runtime did not stamp RuntimeTurnMetadata.execution_kind"
.to_string(),
));
}
Ok(())
}
pub(crate) fn apply_session_effects(
&mut self,
effects: &[crate::ops::SessionEffect],
) -> Result<(), crate::error::AgentError> {
use crate::error::AgentError;
let mut build_state = self.session.build_state().unwrap_or_default();
let mut build_state_changed = false;
let mut visibility_changed = false;
for effect in effects {
match effect {
crate::ops::SessionEffect::GrantManageMob { mob_id } => {
let authority =
build_state
.mob_tool_authority_context
.as_mut()
.ok_or_else(|| {
AgentError::InternalError(
"mob authority effect applied without canonical authority context"
.into(),
)
})?;
authority.grant_manage_mob_in_place(mob_id.clone());
build_state_changed = true;
}
crate::ops::SessionEffect::RequestDeferredTools { authorities } => {
self.tool_scope
.add_requested_deferred_authorities(authorities)
.map_err(|err| {
AgentError::InternalError(format!(
"failed to record requested deferred tool authorities: {err}"
))
})?;
visibility_changed = true;
}
crate::ops::SessionEffect::AppendAssistantBlocks { blocks } => {
self.session.push(crate::types::Message::BlockAssistant(
crate::types::BlockAssistantMessage::new(
blocks.clone(),
crate::types::StopReason::EndTurn,
),
));
}
}
}
if build_state_changed {
self.session.set_build_state(build_state).map_err(|e| {
AgentError::InternalError(format!(
"failed to persist session effects into build state: {e}"
))
})?;
}
if visibility_changed && let Err(err) = self.publish_committed_visible_set() {
return Err(AgentError::InternalError(format!(
"failed to persist session effects into tool visibility state: {err}"
)));
}
if build_state_changed && let Some(ref handle) = self.mob_authority_handle {
let updated = self
.session
.build_state()
.and_then(|bs| bs.mob_tool_authority_context);
if let Some(authority) = updated {
*handle
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = authority;
}
}
Ok(())
}
pub fn set_mob_authority_handle(
&mut self,
handle: Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>,
) {
self.mob_authority_handle = Some(handle);
}
pub fn replace_client(&mut self, client: Arc<C>) {
self.client = client;
}
pub fn apply_llm_request_policy(&mut self, policy: crate::SessionLlmRequestPolicy) {
self.config.model = policy.model;
self.config.provider_params = policy.provider_params;
self.config.provider_tool_defaults = policy.provider_tool_defaults;
}
pub fn replace_client_with_request_policy(
&mut self,
client: Arc<C>,
policy: crate::SessionLlmRequestPolicy,
) {
self.replace_client(client);
self.apply_llm_request_policy(policy);
}
pub fn rotate_auth_lease_auth_binding(
&self,
previous: Option<&crate::AuthBindingRef>,
target: Option<&crate::AuthBindingRef>,
) -> Result<(), AgentError> {
let Some(handle) = self.auth_lease_handle.as_deref() else {
return Ok(());
};
if previous == target {
return Ok(());
}
if let Some(previous) = previous {
let previous_key = crate::handles::LeaseKey::from_auth_binding(previous);
let _ = handle.release_lease(&previous_key);
}
if let Some(target) = target {
let target_key = crate::handles::LeaseKey::from_auth_binding(target);
let target_snapshot = handle.snapshot(&target_key);
if target_snapshot.credential_present && target_snapshot.phase.is_some() {
return Ok(());
}
handle.acquire_lease(&target_key, u64::MAX).map_err(|err| {
AgentError::ConfigError(format!(
"failed to rotate auth lease to auth_binding {target_key}: {err}"
))
})?;
}
Ok(())
}
pub fn cancel_after_boundary_handle(&self) -> Arc<std::sync::atomic::AtomicBool> {
Arc::clone(&self.cancel_after_boundary_requested)
}
pub(crate) fn publish_committed_visible_set(&mut self) -> Result<(), AgentError> {
let visibility_state = self.tool_scope.visibility_state().map_err(|err| {
AgentError::InternalError(format!(
"failed to snapshot canonical tool visibility state: {err}"
))
})?;
self.session
.set_tool_visibility_state(visibility_state)
.map_err(|err| {
AgentError::InternalError(format!(
"failed to persist canonical tool visibility state: {err}"
))
})
}
pub async fn dispatch_external_tool_call(
&mut self,
call: crate::types::ToolCall,
) -> Result<ToolDispatchOutcome, AgentError> {
self.dispatch_external_tool_call_with_timeout_policy(
call,
ToolDispatchTimeoutPolicy::Disabled,
)
.await
}
pub async fn dispatch_external_tool_call_with_timeout_policy(
&mut self,
call: crate::types::ToolCall,
timeout_policy: ToolDispatchTimeoutPolicy,
) -> Result<ToolDispatchOutcome, AgentError> {
let visible_tool_names = self
.tool_scope
.visible_tool_names()
.map_err(|err| AgentError::InternalError(err.to_string()))?
.into_iter()
.collect::<ToolNameSet>();
if let Err(error) =
precheck_visible_tool_call(self.tools.as_ref(), &visible_tool_names, call.name.as_str())
{
return Ok(crate::ops::terminal_tool_outcome_for_error(call.id, error));
}
let args = to_raw_value(&call.args).map_err(|err| {
AgentError::InternalError(format!(
"failed to serialize external tool-call arguments: {err}"
))
})?;
let view = ToolCallView {
id: &call.id,
name: &call.name,
args: args.as_ref(),
};
let dispatch_result = match timeout_policy.timeout() {
Some(timeout) => match tokio::time::timeout(timeout, self.tools.dispatch(view)).await {
Ok(result) => result,
Err(_) => Err(crate::error::ToolError::timeout(
call.name.clone(),
timeout_policy.timeout_ms().unwrap_or(u64::MAX),
)),
},
None => self.tools.dispatch(view).await,
};
match dispatch_result {
Ok(mut outcome) => {
outcome.clear_terminal_cause();
if outcome.result.tool_use_id.is_empty() {
outcome.result.tool_use_id = call.id;
}
if !outcome.session_effects.is_empty() {
self.apply_session_effects(&outcome.session_effects)?;
}
Ok(outcome)
}
Err(crate::error::ToolError::CallbackPending { tool_name, args }) => {
Err(AgentError::CallbackPending { tool_name, args })
}
Err(error) => Ok(crate::ops::terminal_tool_outcome_for_error(call.id, error)),
}
}
#[cfg(test)]
pub(crate) fn inject_tool_scope_boundary_failure_once_for_test(&self) {
self.tool_scope.inject_boundary_failure_once_for_test();
}
}
impl<C, T, S> Agent<C, T, S>
where
C: AgentLlmClient + ?Sized + 'static,
T: AgentToolDispatcher + ?Sized + 'static,
S: AgentSessionStore + ?Sized + 'static,
{
pub fn builder() -> AgentBuilder {
AgentBuilder::new()
}
pub fn session(&self) -> &Session {
&self.session
}
pub fn session_mut(&mut self) -> &mut Session {
&mut self.session
}
pub fn budget(&self) -> &Budget {
&self.budget
}
pub fn state(&self) -> LoopState {
self.execution_snapshot()
.map(|snapshot| snapshot.loop_state)
.unwrap_or(LoopState::CallingLlm)
}
pub fn execution_snapshot(&self) -> Option<crate::AgentExecutionSnapshot> {
let handle = self.turn_state_handle.as_deref()?;
runtime_execution_snapshot(handle, self.applied_cursor)
}
pub fn tool_scope_snapshot(&self) -> Option<crate::ToolScopeSnapshot> {
self.tool_scope.snapshot()
}
pub fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
if let Some(handle) = self.external_tool_surface_handle.as_deref() {
if let Some(snapshot) = runtime_external_tool_surface_snapshot(handle) {
return Some(snapshot);
}
tracing::warn!(
"failed to convert runtime external-tool-surface snapshot; falling back to dispatcher snapshot"
);
}
self.tools.external_tool_surface_snapshot()
}
pub fn retry_policy(&self) -> &RetryPolicy {
&self.retry_policy
}
pub fn depth(&self) -> u32 {
self.depth
}
pub fn event_tap(&self) -> &crate::event_tap::EventTap {
&self.event_tap
}
pub fn tool_scope(&self) -> &crate::ToolScope {
&self.tool_scope
}
pub fn system_context_state(
&self,
) -> Arc<std::sync::Mutex<crate::session::SessionSystemContextState>> {
Arc::clone(&self.system_context_state)
}
pub fn session_with_system_context_state(&self) -> Session {
let mut session = self.session.clone();
let state = match self.system_context_state.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => {
tracing::warn!("system-context state lock poisoned while cloning session");
poisoned.into_inner().clone()
}
};
if let Err(err) = session.set_system_context_state(state) {
tracing::warn!(error = %err, "failed to serialize system-context state into session");
}
if let Ok(visibility_state) = self.tool_scope.visibility_state()
&& let Err(err) = session.set_tool_visibility_state(visibility_state)
{
tracing::warn!(error = %err, "failed to serialize tool visibility state into session");
}
session
}
#[doc(hidden)]
pub fn sync_system_context_state_to_session(&mut self) {
let state = match self.system_context_state.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => {
tracing::warn!("system-context state lock poisoned while syncing session");
poisoned.into_inner().clone()
}
};
if let Err(err) = self.session.set_system_context_state(state) {
tracing::warn!(error = %err, "failed to serialize system-context state into session");
}
}
pub(crate) fn take_pending_system_context_boundary(
&mut self,
) -> Vec<PendingSystemContextAppend> {
let pending = {
let mut state = match self.system_context_state.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("system-context state lock poisoned while applying boundary");
poisoned.into_inner()
}
};
if state.pending.is_empty() {
return Vec::new();
}
let pending = state.pending.clone();
state.mark_pending_applied();
pending
};
self.sync_system_context_state_to_session();
pending
}
pub(crate) fn llm_messages_with_runtime_system_context(
&self,
appends: &[PendingSystemContextAppend],
) -> Vec<Message> {
if appends.is_empty() {
return self.session.messages().to_vec();
}
let mut session = self.session.clone();
session.append_system_context_blocks(appends);
session.messages().to_vec()
}
#[doc(hidden)]
pub async fn checkpoint_current_session(&mut self) {
self.sync_system_context_state_to_session();
if let Some(ref cp) = self.checkpointer {
cp.checkpoint(&self.session).await;
}
}
async fn run_started_hooks(
&self,
prompt: &ContentInput,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) -> Result<(), AgentError> {
let report = self
.execute_hooks(
HookInvocation::run_started(self.session.id().clone(), prompt.clone()),
event_tx,
)
.await?;
if let Some(error) = report.denial_error(HookPoint::RunStarted) {
return Err(error);
}
Ok(())
}
pub(super) async fn run_completed_hooks(
&mut self,
result: &mut RunResult,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) -> Result<(), AgentError> {
let report = self
.execute_hooks(
HookInvocation::run_completed(self.session.id().clone(), result.turns),
event_tx,
)
.await?;
if let Some(error) = report.denial_error(HookPoint::RunCompleted) {
return Err(error);
}
self.run_completed_hooks_applied = true;
Ok(())
}
pub(super) async fn emit_run_completed_event(
&self,
result: &RunResult,
extraction_required: bool,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) {
let _ = crate::event_tap::tap_emit(
&self.event_tap,
event_tx,
AgentEvent::RunCompleted {
session_id: self.session.id().clone(),
result: result.text.clone(),
structured_output: result.structured_output.clone(),
extraction_required,
usage: result.usage.clone(),
terminal_cause_kind: result.terminal_cause_kind,
},
)
.await;
}
pub(super) async fn emit_extraction_succeeded_event(
&self,
structured_output: serde_json::Value,
schema_warnings: Option<Vec<crate::schema::SchemaWarning>>,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) {
let _ = crate::event_tap::tap_emit(
&self.event_tap,
event_tx,
AgentEvent::ExtractionSucceeded {
session_id: self.session.id().clone(),
structured_output,
schema_warnings,
},
)
.await;
}
pub(super) async fn emit_extraction_failed_event(
&self,
error: &crate::types::ExtractionError,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) {
let _ = crate::event_tap::tap_emit(
&self.event_tap,
event_tx,
AgentEvent::ExtractionFailed {
session_id: self.session.id().clone(),
last_output: error.last_output.clone(),
attempts: error.attempts,
reason: error.reason.clone(),
},
)
.await;
}
async fn emit_run_started_event(
&self,
prompt: ContentInput,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) {
let _ = crate::event_tap::tap_emit(
&self.event_tap,
event_tx,
AgentEvent::RunStarted {
session_id: self.session.id().clone(),
prompt,
},
)
.await;
}
async fn emit_run_failed_event(
&self,
error: &AgentError,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) {
let error_report = crate::event::AgentErrorReport::from_agent_error(error);
let terminal_cause_kind = match error {
AgentError::TerminalFailure { cause_kind, .. }
if cause_kind.is_specific_failure_cause() =>
{
Some(*cause_kind)
}
_ => self
.execution_snapshot()
.and_then(|snapshot| snapshot.terminal_cause_kind)
.filter(|cause_kind| *cause_kind != TurnTerminalCauseKind::Unknown),
};
let _ = crate::event_tap::tap_emit(
&self.event_tap,
event_tx,
AgentEvent::RunFailed {
session_id: self.session.id().clone(),
error_class: error_report.class,
error: error_report.message.clone(),
terminal_cause_kind,
error_report: Some(error_report),
},
)
.await;
}
async fn handle_run_failure(
&self,
error: &AgentError,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) {
if let Err(hook_err) = self.run_failed_hooks(error, event_tx).await {
tracing::warn!(?hook_err, "run_failed hook execution failed");
}
self.emit_run_failed_event(error, event_tx).await;
}
async fn run_failed_hooks(
&self,
error: &AgentError,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
) -> Result<(), AgentError> {
let report = self
.execute_hooks(
HookInvocation::run_failed(self.session.id().clone(), error),
event_tx,
)
.await?;
if let Some(error) = report.denial_error(HookPoint::RunFailed) {
return Err(error);
}
Ok(())
}
pub async fn run(&mut self, user_input: ContentInput) -> Result<RunResult, AgentError> {
self.run_inner(user_input, None).await
}
pub async fn run_with_events(
&mut self,
user_input: ContentInput,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError> {
self.run_inner(user_input, Some(event_tx)).await
}
pub async fn run_pending(&mut self) -> Result<RunResult, AgentError> {
self.run_pending_inner(None).await
}
pub async fn run_pending_with_events(
&mut self,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError> {
self.run_pending_inner(Some(event_tx)).await
}
async fn run_inner(
&mut self,
user_input: ContentInput,
event_tx: Option<mpsc::Sender<AgentEvent>>,
) -> Result<RunResult, AgentError> {
let event_tx = event_tx.or_else(|| self.default_event_tx.clone());
self.require_runtime_execution_kind()?;
self.extraction_state.reset();
self.run_completed_hooks_applied = false;
self.run_completed_event_emitted = false;
let user_input = if user_input.has_non_text_content() {
let skill_text = self.apply_skill_ref(String::new()).await;
if skill_text.is_empty() {
user_input
} else {
let mut blocks = vec![crate::types::ContentBlock::Text { text: skill_text }];
blocks.extend(user_input.into_blocks());
ContentInput::Blocks(blocks)
}
} else {
let text = self.apply_skill_ref(user_input.text_content()).await;
ContentInput::Text(text)
};
let run_prompt_input = user_input.clone();
let user_message = if user_input.has_non_text_content() {
crate::types::UserMessage::with_blocks(user_input.into_blocks())
} else {
crate::types::UserMessage::text(user_input.text_content())
};
self.session.push(Message::User(user_message));
self.emit_run_started_event(run_prompt_input.clone(), event_tx.as_ref())
.await;
if let Err(err) = self
.run_started_hooks(&run_prompt_input, event_tx.as_ref())
.await
{
self.handle_run_failure(&err, event_tx.as_ref()).await;
self.clear_runtime_execution_kind();
return Err(err);
}
match self.run_loop(event_tx.clone()).await {
Ok(mut result) => {
if !self.run_completed_hooks_applied
&& let Err(err) = self
.run_completed_hooks(&mut result, event_tx.as_ref())
.await
{
self.handle_run_failure(&err, event_tx.as_ref()).await;
self.clear_runtime_execution_kind();
return Err(err);
}
if !self.run_completed_event_emitted {
self.emit_run_completed_event(&result, false, event_tx.as_ref())
.await;
self.run_completed_event_emitted = true;
}
self.checkpoint_current_session().await;
self.clear_runtime_execution_kind();
Ok(result)
}
Err(err) => {
self.handle_run_failure(&err, event_tx.as_ref()).await;
self.clear_runtime_execution_kind();
Err(err)
}
}
}
pub(super) async fn run_pending_inner(
&mut self,
event_tx: Option<mpsc::Sender<AgentEvent>>,
) -> Result<RunResult, AgentError> {
let event_tx = event_tx.or_else(|| self.default_event_tx.clone());
let pending_prompt = self.session.messages().last().and_then(|m| match m {
Message::User(u) => Some(u.text_content()),
Message::ToolResults { .. } => Some(String::new()),
_ => None,
});
let Some(prompt) = pending_prompt else {
self.clear_runtime_execution_kind();
return Err(AgentError::ConfigError(
"run_pending requires a pending user or tool-results continuation boundary in the session".to_string(),
));
};
self.require_runtime_execution_kind()?;
self.extraction_state.reset();
self.run_completed_hooks_applied = false;
self.run_completed_event_emitted = false;
self.emit_run_started_event(ContentInput::Text(prompt.clone()), event_tx.as_ref())
.await;
if let Err(err) = self
.run_started_hooks(&ContentInput::Text(prompt.clone()), event_tx.as_ref())
.await
{
self.handle_run_failure(&err, event_tx.as_ref()).await;
self.clear_runtime_execution_kind();
return Err(err);
}
match self.run_loop(event_tx.clone()).await {
Ok(mut result) => {
if !self.run_completed_hooks_applied
&& let Err(err) = self
.run_completed_hooks(&mut result, event_tx.as_ref())
.await
{
self.handle_run_failure(&err, event_tx.as_ref()).await;
self.clear_runtime_execution_kind();
return Err(err);
}
if !self.run_completed_event_emitted {
self.emit_run_completed_event(&result, false, event_tx.as_ref())
.await;
self.run_completed_event_emitted = true;
}
self.checkpoint_current_session().await;
self.clear_runtime_execution_kind();
Ok(result)
}
Err(err) => {
self.handle_run_failure(&err, event_tx.as_ref()).await;
self.clear_runtime_execution_kind();
Err(err)
}
}
}
pub fn cancel(&mut self) {
use crate::turn_execution_authority::TurnExecutionInput;
self.clear_runtime_execution_kind();
let snapshot = self
.turn_state_handle
.as_deref()
.map(crate::handles::TurnStateHandle::snapshot);
let input = match snapshot.and_then(|s| s.active_run_id) {
Some(run_id) => TurnExecutionInput::CancelNow { run_id },
None => TurnExecutionInput::ForceCancelNoRun,
};
let _ = self.apply_turn_input(input);
}
async fn apply_skill_ref(&mut self, user_input: String) -> String {
let engine = match &self.skill_engine {
Some(e) => e.clone(),
None => return user_input,
};
let mut prefix_parts: Vec<String> = Vec::new();
if let Some(refs) = self.pending_skill_references.take()
&& !refs.is_empty()
{
let canonical_keys: Vec<crate::skills::SkillKey> = refs.into_iter().collect();
match engine.resolve_and_render(&canonical_keys).await {
Ok(resolved) => {
for skill in &resolved {
tracing::info!(
skill_key = %skill.key,
"Per-turn skill activation via skill_references"
);
prefix_parts.push(skill.rendered_body.clone());
}
}
Err(e) => {
tracing::warn!(
error = %e,
"Failed to resolve source-pinned skill_references"
);
}
}
}
if prefix_parts.is_empty() {
return user_input;
}
if user_input.is_empty() {
prefix_parts.join("\n\n")
} else {
format!("{}\n\n{user_input}", prefix_parts.join("\n\n"))
}
}
}