mod types;
pub use types::*;
use crate::error::{Result, TinyAgentsError};
use crate::harness::context::{RunConfig, RunContext};
use crate::harness::events::{AgentEvent, HarnessRunStatus};
use crate::harness::ids::{CallId, ComponentId, HarnessPhase};
use crate::harness::message::Message;
use crate::harness::middleware::AgentRun;
use crate::harness::model::{
ModelRequest, ModelResolutionSource, ModelResponse, ResolvedModel, ResolvedModelBinding,
ResponseFormat,
};
use crate::harness::retry::is_retryable;
use crate::harness::runtime::AgentHarness;
use crate::harness::structured::{StructuredExtractor, StructuredStrategy};
impl<State: Send + Sync, Ctx: Send + Sync> AgentHarness<State, Ctx> {
pub async fn invoke(
&self,
state: &State,
ctx_data: Ctx,
config: RunConfig,
input: Vec<Message>,
) -> Result<AgentRun> {
self.invoke_with_status(state, ctx_data, config, input)
.await
.map(|result| result.run)
}
pub async fn invoke_default(&self, state: &State, input: Vec<Message>) -> Result<AgentRun>
where
Ctx: Default,
{
self.invoke(state, Ctx::default(), RunConfig::new("run"), input)
.await
}
pub async fn invoke_with_status(
&self,
state: &State,
ctx_data: Ctx,
config: RunConfig,
input: Vec<Message>,
) -> Result<AgentLoopResult> {
let ctx = RunContext::new(config, ctx_data);
self.drive(state, ctx, input).await
}
pub async fn invoke_in_context(
&self,
state: &State,
ctx: RunContext<Ctx>,
input: Vec<Message>,
) -> Result<AgentRun> {
self.drive(state, ctx, input).await.map(|result| result.run)
}
pub async fn invoke_in_context_with_status(
&self,
state: &State,
ctx: RunContext<Ctx>,
input: Vec<Message>,
) -> Result<AgentLoopResult> {
self.drive(state, ctx, input).await
}
async fn drive(
&self,
state: &State,
mut ctx: RunContext<Ctx>,
input: Vec<Message>,
) -> Result<AgentLoopResult> {
let run_id = ctx.config.run_id.clone();
let thread_id = ctx.config.thread_id.clone();
let mut status = HarnessRunStatus::new(run_id.clone(), ComponentId::new("agent_loop"));
if let Some(thread) = thread_id {
status = status.with_thread(thread);
}
let mut run = AgentRun::new();
match self
.run_loop(state, &mut ctx, &mut run, &mut status, input)
.await
{
Ok(()) => {
status.mark_completed();
Ok(AgentLoopResult { run, status })
}
Err(error) => {
let record = ctx.emit(AgentEvent::RunFailed {
run_id,
error: error.to_string(),
});
status.set_last_event(record.id);
status.mark_failed(error.to_string());
let _ = self.middleware.run_on_error(&mut ctx, &error).await;
Err(error)
}
}
}
async fn run_loop(
&self,
state: &State,
ctx: &mut RunContext<Ctx>,
run: &mut AgentRun,
status: &mut HarnessRunStatus,
input: Vec<Message>,
) -> Result<()> {
let record = ctx.emit(AgentEvent::RunStarted {
run_id: ctx.run_id().clone(),
thread_id: ctx.thread_id().cloned(),
});
status.set_last_event(record.id);
status.mark_running(HarnessPhase::Idle);
let mut messages = input;
status.mark_running(HarnessPhase::Middleware);
self.middleware.run_before_agent(ctx, state).await?;
loop {
match crate::harness::steering::apply_pending_steering(ctx, &mut messages)? {
crate::harness::steering::SteeringOutcome::Cancel => {
return Err(TinyAgentsError::Cancelled);
}
crate::harness::steering::SteeringOutcome::Pause => break,
crate::harness::steering::SteeringOutcome::Continue => {}
}
ctx.check_deadline().map_err(|_| {
TinyAgentsError::Timeout(format!(
"run `{}` exceeded its wall-clock deadline",
ctx.run_id()
))
})?;
if run.model_calls >= self.policy.limits.max_model_calls {
return Err(TinyAgentsError::LimitExceeded(format!(
"max model calls ({}) reached",
self.policy.limits.max_model_calls
)));
}
status.mark_running(HarnessPhase::BuildingRequest);
let mut request = ModelRequest::new(messages.clone()).with_tools(self.tools.schemas());
if let Some(format) = &self.policy.default_response_format {
request = request.with_response_format(format.clone());
}
status.mark_running(HarnessPhase::Middleware);
self.middleware
.run_before_model(ctx, state, &mut request)
.await?;
ctx.record_model_call().map_err(|_| {
TinyAgentsError::LimitExceeded(format!(
"max model calls ({}) reached",
self.policy.limits.max_model_calls
))
})?;
let binding = self
.models
.resolve_request(&request, None, None)
.ok_or_else(|| {
TinyAgentsError::ModelNotFound(
request
.model
.clone()
.unwrap_or_else(|| "<default>".to_string()),
)
})?;
let model_name = binding.resolved.name.clone();
let call_id = CallId::new(format!("{}-model-{}", ctx.run_id(), run.model_calls + 1));
status.mark_running(HarnessPhase::Model);
status.active_model_call = Some(call_id.clone());
let record = ctx.emit(AgentEvent::ModelStarted {
call_id: call_id.clone(),
model: model_name,
});
status.set_last_event(record.id);
let mut response = self
.invoke_model_with_retry(state, ctx, &request, &call_id, binding)
.await?;
status.mark_running(HarnessPhase::Middleware);
self.middleware
.run_after_model(ctx, state, &mut response)
.await?;
run.model_calls += 1;
run.steps += 1;
status.model_calls = run.model_calls;
status.active_model_call = None;
if let Some(usage) = response.usage {
run.usage.record(usage);
status.usage = run.usage;
}
let record = ctx.emit(AgentEvent::ModelCompleted {
call_id,
usage: response.usage,
});
status.set_last_event(record.id);
messages.push(Message::Assistant(response.message.clone()));
let tool_calls = response.tool_calls().to_vec();
if tool_calls.is_empty() {
if let Some(ResponseFormat::JsonSchema { name, schema }) =
&self.policy.default_response_format
{
let extractor = StructuredExtractor::new(
StructuredStrategy::ProviderSchema,
name.clone(),
schema.clone(),
);
let output = extractor.extract(&response)?;
run.structured = Some(output.value);
}
run.final_response = Some(response);
break;
}
status.mark_running(HarnessPhase::Tools);
for mut call in tool_calls {
ctx.check_deadline().map_err(|_| {
TinyAgentsError::Timeout(format!(
"run `{}` exceeded its wall-clock deadline",
ctx.run_id()
))
})?;
if run.tool_calls >= self.policy.limits.max_tool_calls {
return Err(TinyAgentsError::LimitExceeded(format!(
"max tool calls ({}) reached",
self.policy.limits.max_tool_calls
)));
}
ctx.record_tool_call().map_err(|_| {
TinyAgentsError::LimitExceeded(format!(
"max tool calls ({}) reached",
self.policy.limits.max_tool_calls
))
})?;
self.middleware
.run_before_tool(ctx, state, &mut call)
.await?;
let tool = self
.tools
.get(&call.name)
.ok_or_else(|| TinyAgentsError::ToolNotFound(call.name.clone()))?;
let tool_call_id = CallId::new(call.id.clone());
let tool_name = call.name.clone();
status.active_tool_calls.push(tool_call_id.clone());
let record = ctx.emit(AgentEvent::ToolStarted {
call_id: tool_call_id.clone(),
tool_name: tool_name.clone(),
});
status.set_last_event(record.id);
let mut result = tool.call(state, call).await?;
self.middleware
.run_after_tool(ctx, state, &mut result)
.await?;
run.tool_calls += 1;
status.tool_calls = run.tool_calls;
status.active_tool_calls.retain(|c| c != &tool_call_id);
let record = ctx.emit(AgentEvent::ToolCompleted {
call_id: tool_call_id,
tool_name,
});
status.set_last_event(record.id);
messages.push(Message::tool(
result.call_id.clone(),
result.content.clone(),
));
}
}
run.messages = messages;
status.mark_running(HarnessPhase::Middleware);
self.middleware.run_after_agent(ctx, state, run).await?;
let record = ctx.emit(AgentEvent::RunCompleted {
run_id: ctx.run_id().clone(),
});
status.set_last_event(record.id);
Ok(())
}
async fn invoke_model_with_retry(
&self,
state: &State,
ctx: &mut RunContext<Ctx>,
request: &ModelRequest,
call_id: &CallId,
binding: ResolvedModelBinding<State>,
) -> Result<ModelResponse> {
let mut current_name = binding.resolved.name.clone();
let mut model = binding.model;
let mut resolved = binding.resolved;
loop {
let mut attempt = 0usize;
let outcome = loop {
match model.invoke(state, request.clone()).await {
Ok(response) => break Ok(response),
Err(error) => {
if is_retryable(&error) && self.policy.retry.should_retry(attempt) {
attempt += 1;
ctx.emit(AgentEvent::RetryScheduled {
call_id: call_id.clone(),
attempt,
});
let _ = self.policy.retry.backoff_for_attempt(attempt);
continue;
}
break Err(error);
}
}
};
match outcome {
Ok(mut response) => {
if response.resolved_model.is_none() {
response.resolved_model = Some(resolved);
}
return Ok(response);
}
Err(error) => {
let next = self
.policy
.fallback
.as_ref()
.and_then(|fallback| fallback.next_after(¤t_name))
.map(str::to_owned);
match next.and_then(|name| self.models.get(&name).map(|m| (name, m))) {
Some((name, next_model)) => {
resolved = ResolvedModel {
name: name.clone(),
requested: Some(name.clone()),
source: ModelResolutionSource::Hint,
};
current_name = name;
model = next_model;
continue;
}
None => return Err(error),
}
}
}
}
}
}
#[cfg(test)]
mod test;