mod dispatch;
mod guardrails;
mod options;
mod retry;
mod step;
mod streaming;
mod streaming_forward;
use crate::agent::AgentContext;
use crate::error::Error;
use crate::ids::ThreadId;
use crate::llm::{ChatRequest, Message, Role};
use crate::memory::Episode;
use std::time::Instant;
use tracing::{error, instrument};
use guardrails::{check_post_llm, check_pre_llm};
use retry::complete_with_retry;
use step::{record_and_dispatch, StepDisposition, StepOutcome};
pub use options::RunOptions;
pub use streaming::run_steps_streaming;
#[instrument(level = "debug", skip(ctx, system_prompt), fields(run_id = %ctx.run_id))]
pub async fn run_steps(
ctx: &AgentContext,
system_prompt: &str,
thread: ThreadId,
opts: RunOptions,
) -> Result<String, Error> {
ctx.episodic
.record(
ctx.run_id,
Episode::Started {
agent: ctx.agent_name.clone(),
},
)
.await?;
let mut step = 0u32;
loop {
if ctx.cancel.is_cancelled() {
error!("cancelled");
return Err(Error::Cancelled);
}
if step >= opts.max_steps {
return Err(Error::MaxStepsExceeded {
steps: opts.max_steps,
});
}
step += 1;
let req = build_request(ctx, system_prompt, &thread, opts.max_history_tokens).await?;
check_pre_llm(&opts.guardrails, &req).await?;
let started = Instant::now();
let resp = complete_with_retry(ctx.llm.as_ref(), &ctx.cancel, req.clone()).await?;
let latency_ms = started.elapsed().as_millis() as u32;
check_post_llm(&opts.guardrails, &req, &resp).await?;
let outcome = StepOutcome {
message: resp.message,
finish_reason: resp.finish_reason,
usage: resp.usage,
latency_ms,
};
match record_and_dispatch(ctx, &thread, step, outcome, "blocking").await? {
StepDisposition::Done(content) => return Ok(content),
StepDisposition::Continue => continue,
}
}
}
pub(super) async fn build_request(
ctx: &AgentContext,
system_prompt: &str,
thread: &ThreadId,
max_history_tokens: usize,
) -> Result<ChatRequest, Error> {
let history = ctx
.short_term
.load(thread.clone(), max_history_tokens)
.await?;
let mut messages = Vec::with_capacity(history.len() + 1);
if !system_prompt.is_empty() {
messages.push(Message {
role: Role::System,
content: system_prompt.into(),
tool_calls: vec![],
tool_call_id: None,
});
}
messages.extend(history);
Ok(ChatRequest {
messages,
tools: ctx.tools.catalogue(),
..ChatRequest::new(vec![])
})
}