use crate::model::{
Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
Usage,
};
use anyhow::Result;
use async_stream::stream;
pub use builder::AgentBuilder;
pub use config::AgentConfig;
use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
use futures_core::Stream;
use futures_util::StreamExt;
use tokio::sync::{mpsc, oneshot};
pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
mod builder;
mod compact;
pub mod config;
pub mod event;
pub mod tool;
fn last_sender(history: &[Message]) -> String {
history
.iter()
.rev()
.find(|m| m.role == Role::User)
.map(|m| m.sender.clone())
.unwrap_or_default()
}
pub struct Agent<M: Model> {
pub config: AgentConfig,
model: M,
tools: Vec<Tool>,
tool_tx: Option<ToolSender>,
}
impl<M: Model> Agent<M> {
fn model_name(&self) -> String {
self.config
.model
.clone()
.unwrap_or_else(|| self.model.active_model())
}
fn build_request(&self, history: &[Message]) -> Request {
let model_name = self.model_name();
let mut messages = Vec::with_capacity(1 + history.len());
if !self.config.system_prompt.is_empty() {
messages.push(Message::system(&self.config.system_prompt));
}
messages.extend(history.iter().map(|m| m.with_agent_tag()));
let mut request = Request::new(model_name)
.with_messages(messages)
.with_tool_choice(self.config.tool_choice.clone())
.with_think(self.config.thinking);
if !self.tools.is_empty() {
request = request.with_tools(self.tools.clone());
}
request
}
pub async fn step(
&self,
history: &mut Vec<Message>,
conversation_id: Option<u64>,
) -> Result<AgentStep> {
let request = self.build_request(history);
let response = self.model.send(&request).await?;
let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
if let Some(msg) = response.message() {
history.push(msg);
}
let mut tool_results = Vec::new();
if !tool_calls.is_empty() {
let sender = last_sender(history);
for tc in &tool_calls {
let result = self
.dispatch_tool(
&tc.function.name,
&tc.function.arguments,
&sender,
conversation_id,
)
.await;
let msg = Message::tool(&result, tc.id.clone(), &tc.function.name);
history.push(msg.clone());
tool_results.push(msg);
}
}
Ok(AgentStep {
response,
tool_calls,
tool_results,
})
}
async fn dispatch_tool(
&self,
name: &str,
args: &str,
sender: &str,
conversation_id: Option<u64>,
) -> String {
let Some(tx) = &self.tool_tx else {
return format!("tool '{name}' called but no tool sender configured");
};
let (reply_tx, reply_rx) = oneshot::channel();
let req = ToolRequest {
name: name.to_owned(),
args: args.to_owned(),
agent: self.config.name.to_string(),
reply: reply_tx,
task_id: None,
sender: sender.into(),
conversation_id,
};
if tx.send(req).is_err() {
return format!("tool channel closed while calling '{name}'");
}
reply_rx
.await
.unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
}
fn stop_reason(step: &AgentStep) -> AgentStopReason {
if step.response.content().is_some() {
AgentStopReason::TextResponse
} else {
AgentStopReason::NoAction
}
}
pub async fn run(
&self,
history: &mut Vec<Message>,
events: mpsc::UnboundedSender<AgentEvent>,
conversation_id: Option<u64>,
) -> AgentResponse {
let mut stream = std::pin::pin!(self.run_stream(history, conversation_id));
let mut response = None;
while let Some(event) = stream.next().await {
if let AgentEvent::Done(ref resp) = event {
response = Some(resp.clone());
}
let _ = events.send(event);
}
response.unwrap_or_else(|| AgentResponse {
final_response: None,
iterations: 0,
stop_reason: AgentStopReason::Error("stream ended without Done".into()),
steps: vec![],
model: self.model_name(),
})
}
pub fn run_stream<'a>(
&'a self,
history: &'a mut Vec<Message>,
conversation_id: Option<u64>,
) -> impl Stream<Item = AgentEvent> + 'a {
stream! {
let mut steps = Vec::new();
let max = self.config.max_iterations;
let model_name = self.model_name();
for _ in 0..max {
let request = self.build_request(history);
let mut builder = MessageBuilder::new(Role::Assistant);
let mut finish_reason = None;
let mut last_meta = CompletionMeta::default();
let mut last_usage = None;
let mut stream_error = None;
let mut tool_begin_emitted = false;
{
let mut chunk_stream = std::pin::pin!(self.model.stream(request));
while let Some(result) = chunk_stream.next().await {
match result {
Ok(chunk) => {
if let Some(text) = chunk.content() {
yield AgentEvent::TextDelta(text.to_owned());
}
if let Some(reason) = chunk.reasoning_content() {
yield AgentEvent::ThinkingDelta(reason.to_owned());
}
if let Some(r) = chunk.reason() {
finish_reason = Some(r.clone());
}
last_meta = chunk.meta.clone();
if chunk.usage.is_some() {
last_usage = chunk.usage.clone();
}
builder.accept(&chunk);
if !tool_begin_emitted {
let calls = builder.peek_tool_calls();
if !calls.is_empty() {
tool_begin_emitted = true;
yield AgentEvent::ToolCallsBegin(calls);
}
}
}
Err(e) => {
stream_error = Some(e.to_string());
break;
}
}
}
}
if let Some(e) = stream_error {
yield AgentEvent::Done(AgentResponse {
final_response: None,
iterations: steps.len(),
stop_reason: AgentStopReason::Error(e),
steps,
model: model_name.clone(),
});
return;
}
let msg = builder.build();
let tool_calls = msg.tool_calls.to_vec();
let content = if msg.content.is_empty() {
None
} else {
Some(msg.content.clone())
};
let response = Response {
meta: last_meta,
choices: vec![Choice {
index: 0,
delta: Delta {
role: Some(Role::Assistant),
content: content.clone(),
reasoning_content: if msg.reasoning_content.is_empty() {
None
} else {
Some(msg.reasoning_content.clone())
},
tool_calls: if tool_calls.is_empty() {
None
} else {
Some(tool_calls.clone())
},
},
finish_reason,
logprobs: None,
}],
usage: last_usage.unwrap_or(Usage {
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
prompt_cache_hit_tokens: None,
prompt_cache_miss_tokens: None,
completion_tokens_details: None,
}),
};
history.push(msg);
let has_tool_calls = !tool_calls.is_empty();
let mut tool_results = Vec::new();
if has_tool_calls {
let sender = last_sender(history);
yield AgentEvent::ToolCallsStart(tool_calls.clone());
for tc in &tool_calls {
let tool_start = std::time::Instant::now();
let result = self
.dispatch_tool(&tc.function.name, &tc.function.arguments, &sender, conversation_id)
.await;
let duration_ms = tool_start.elapsed().as_millis() as u64;
let msg = Message::tool(&result, tc.id.clone(), &tc.function.name);
history.push(msg.clone());
tool_results.push(msg);
yield AgentEvent::ToolResult {
call_id: tc.id.clone(),
output: result,
duration_ms,
};
}
yield AgentEvent::ToolCallsComplete;
}
if let Some(threshold) = self.config.compact_threshold
&& Self::estimate_tokens(history) > threshold
{
if let Some(summary) = self.compact(history).await {
yield AgentEvent::Compact { summary: summary.clone() };
*history = vec![Message::user(&summary)];
yield AgentEvent::TextDelta(
"\n[context compacted]\n".to_owned(),
);
}
continue;
}
let step = AgentStep {
response,
tool_calls,
tool_results,
};
if !has_tool_calls {
let stop_reason = Self::stop_reason(&step);
steps.push(step);
yield AgentEvent::Done(AgentResponse {
final_response: content,
iterations: steps.len(),
stop_reason,
steps,
model: model_name.clone(),
});
return;
}
steps.push(step);
}
let final_response = steps.last().and_then(|s| s.response.content().cloned());
yield AgentEvent::Done(AgentResponse {
final_response,
iterations: steps.len(),
stop_reason: AgentStopReason::MaxIterations,
steps,
model: model_name,
});
}
}
}