use crate::model::{HistoryEntry, MessageBuilder, Model};
use anyhow::Result;
use async_stream::stream;
pub use builder::AgentBuilder;
pub use config::AgentConfig;
use crabllm_core::{ChatCompletionRequest, Provider, Role, Tool, ToolCall, ToolChoice, Usage};
use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
use futures_core::Stream;
use futures_util::{StreamExt, future::join_all, stream::FuturesUnordered};
pub use id::AgentId;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
pub use tool::{AsTool, ToolDispatcher};
mod builder;
mod compact;
pub mod config;
pub mod event;
mod id;
pub mod tool;
fn empty_assistant_message() -> crabllm_core::Message {
crabllm_core::Message {
role: Role::Assistant,
content: Some(serde_json::Value::String(String::new())),
tool_calls: None,
tool_call_id: None,
name: None,
reasoning_content: None,
extra: Default::default(),
}
}
fn last_sender(history: &[HistoryEntry]) -> String {
history
.iter()
.rev()
.find(|e| *e.role() == Role::User)
.map(|e| e.sender.clone())
.unwrap_or_default()
}
fn tool_output_text(result: &Result<String, String>) -> &str {
match result {
Ok(s) | Err(s) => s,
}
}
pub struct Agent<P: Provider + 'static> {
pub config: AgentConfig,
model: Model<P>,
tools: Vec<Tool>,
dispatcher: Option<Arc<dyn ToolDispatcher>>,
}
impl<P: Provider + 'static> Clone for Agent<P> {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
model: self.model.clone(),
tools: self.tools.clone(),
dispatcher: self.dispatcher.clone(),
}
}
}
impl<P: Provider + 'static> Agent<P> {
fn model_name(&self) -> String {
self.config.model.clone()
}
fn build_request(
&self,
history: &[HistoryEntry],
tool_choice_override: Option<&ToolChoice>,
) -> ChatCompletionRequest {
let model_name = self.model_name();
let mut messages = Vec::with_capacity(1 + history.len());
if !self.config.system_prompt.is_empty() {
messages.push(crabllm_core::Message::system(&self.config.system_prompt));
}
messages.extend(history.iter().map(|e| e.to_wire_message()));
let tool_choice = tool_choice_override
.cloned()
.unwrap_or_else(|| self.config.tool_choice.clone());
ChatCompletionRequest {
model: model_name,
messages,
temperature: None,
top_p: None,
max_tokens: None,
stream: None,
stop: None,
tools: if self.tools.is_empty() {
None
} else {
Some(self.tools.clone())
},
tool_choice: Some(tool_choice),
frequency_penalty: None,
presence_penalty: None,
seed: None,
user: None,
reasoning_effort: self.config.thinking.then(|| "high".to_string()),
extra: Default::default(),
}
}
pub async fn step(
&self,
history: &mut Vec<HistoryEntry>,
conversation_id: Option<u64>,
) -> Result<AgentStep> {
let request = self.build_request(history, None);
let response = self.model.send_ct(request).await?;
let tool_calls: Vec<ToolCall> = response.tool_calls().to_vec();
let finish_reason = response.finish_reason().cloned();
let usage = response.usage.clone().unwrap_or_default();
let Some(message) = response.message().cloned() else {
return Ok(AgentStep {
message: empty_assistant_message(),
usage,
finish_reason,
tool_calls,
tool_results: Vec::new(),
});
};
history.push(HistoryEntry::from_message(message.clone()));
let mut tool_results = Vec::new();
if !tool_calls.is_empty() {
let sender = last_sender(history);
let outputs = join_all(tool_calls.iter().map(|tc| {
self.dispatch_tool(
&tc.function.name,
&tc.function.arguments,
&sender,
conversation_id,
)
}))
.await;
for (tc, result) in tool_calls.iter().zip(outputs) {
let entry =
HistoryEntry::tool(tool_output_text(&result), tc.id.clone(), &tc.function.name);
history.push(entry.clone());
tool_results.push(entry);
}
}
Ok(AgentStep {
message,
usage,
finish_reason,
tool_calls,
tool_results,
})
}
async fn dispatch_tool(
&self,
name: &str,
args: &str,
sender: &str,
conversation_id: Option<u64>,
) -> Result<String, String> {
let Some(dispatcher) = &self.dispatcher else {
return Err(format!(
"tool '{name}' called but no tool dispatcher configured"
));
};
dispatcher
.dispatch(name, args, &self.config.name, sender, conversation_id)
.await
}
fn stop_reason(step: &AgentStep) -> AgentStopReason {
let has_text = step
.message
.content
.as_ref()
.and_then(|v| v.as_str())
.is_some_and(|s| !s.is_empty());
if has_text {
AgentStopReason::TextResponse
} else {
AgentStopReason::NoAction
}
}
pub async fn run(
&self,
history: &mut Vec<HistoryEntry>,
events: mpsc::UnboundedSender<AgentEvent>,
conversation_id: Option<u64>,
tool_choice: Option<ToolChoice>,
) -> AgentResponse {
let mut stream =
std::pin::pin!(self.run_stream(history, conversation_id, None, tool_choice));
let mut response = None;
while let Some(event) = stream.next().await {
if let AgentEvent::Done(ref resp) = event {
response = Some(resp.clone());
}
let _ = events.send(event);
}
response.unwrap_or_else(|| AgentResponse {
final_response: None,
iterations: 0,
stop_reason: AgentStopReason::Error("stream ended without Done".into()),
steps: vec![],
model: self.model_name(),
})
}
pub fn run_stream<'a>(
&'a self,
history: &'a mut Vec<HistoryEntry>,
conversation_id: Option<u64>,
mut steer_rx: Option<watch::Receiver<Option<String>>>,
tool_choice: Option<ToolChoice>,
) -> impl Stream<Item = AgentEvent> + 'a {
stream! {
let mut steps = Vec::new();
let max = self.config.max_iterations;
let model_name = self.model_name();
for _ in 0..max {
let steer_content = steer_rx.as_mut().and_then(|rx| {
rx.has_changed().ok()?.then(|| rx.borrow_and_update().clone())?
});
if let Some(content) = steer_content {
let sender = last_sender(history);
history.push(HistoryEntry::user_with_sender(&content, &sender));
yield AgentEvent::UserSteered { content };
}
let request = self.build_request(history, tool_choice.as_ref());
let mut builder = MessageBuilder::new(Role::Assistant);
let mut finish_reason = None;
let mut last_usage: Option<Usage> = None;
let mut stream_error = None;
let mut tool_begin_emitted = false;
#[derive(PartialEq)]
enum OpenSegment { None, Text, Thinking }
let mut open = OpenSegment::None;
{
let mut chunk_stream = std::pin::pin!(self.model.stream_ct(request));
while let Some(result) = chunk_stream.next().await {
match result {
Ok(chunk) => {
if let Some(text) = chunk.content() {
if open != OpenSegment::Text {
if open == OpenSegment::Thinking {
yield AgentEvent::ThinkingEnd;
}
yield AgentEvent::TextStart;
open = OpenSegment::Text;
}
yield AgentEvent::TextDelta(text.to_owned());
}
if let Some(reason) = chunk.reasoning_content() {
if open != OpenSegment::Thinking {
if open == OpenSegment::Text {
yield AgentEvent::TextEnd;
}
yield AgentEvent::ThinkingStart;
open = OpenSegment::Thinking;
}
yield AgentEvent::ThinkingDelta(reason.to_owned());
}
if let Some(r) = chunk.finish_reason() {
finish_reason = Some(r.clone());
}
if chunk.usage.is_some() {
last_usage = chunk.usage.clone();
}
builder.accept(&chunk);
if !tool_begin_emitted {
let calls = builder.peek_tool_calls();
if !calls.is_empty() {
tool_begin_emitted = true;
yield AgentEvent::ToolCallsBegin(calls);
}
}
}
Err(e) => {
stream_error = Some(e.to_string());
break;
}
}
}
match open {
OpenSegment::Text => yield AgentEvent::TextEnd,
OpenSegment::Thinking => yield AgentEvent::ThinkingEnd,
OpenSegment::None => {}
}
}
if let Some(e) = stream_error {
yield AgentEvent::Done(AgentResponse {
final_response: None,
iterations: steps.len(),
stop_reason: AgentStopReason::Error(e),
steps,
model: model_name.clone(),
});
return;
}
let message = builder.build();
let tool_calls: Vec<ToolCall> =
message.tool_calls.clone().unwrap_or_default();
let content = message
.content
.as_ref()
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_owned());
let usage = last_usage.unwrap_or_default();
let has_tool_calls = !tool_calls.is_empty();
if content.is_none() && !has_tool_calls {
yield AgentEvent::Done(AgentResponse {
final_response: None,
iterations: steps.len(),
stop_reason: AgentStopReason::NoAction,
steps,
model: model_name.clone(),
});
return;
}
history.push(HistoryEntry::from_message(message.clone()));
let mut tool_results = Vec::new();
if has_tool_calls {
let sender = last_sender(history);
yield AgentEvent::ToolCallsStart(tool_calls.clone());
let mut pending: FuturesUnordered<_> = tool_calls
.iter()
.enumerate()
.map(|(idx, tc)| {
let fut = self.dispatch_tool(
&tc.function.name,
&tc.function.arguments,
&sender,
conversation_id,
);
async move {
let start = std::time::Instant::now();
let out = fut.await;
(idx, out, start.elapsed().as_millis() as u64)
}
})
.collect();
let mut buffered: Vec<Option<Result<String, String>>> =
vec![None; tool_calls.len()];
while let Some((idx, output, duration_ms)) = pending.next().await {
let call_id = tool_calls[idx].id.clone();
yield AgentEvent::ToolResult {
call_id,
output: output.clone(),
duration_ms,
};
buffered[idx] = Some(output);
}
for (tc, out) in tool_calls.iter().zip(buffered.into_iter()) {
let out = out.expect("FuturesUnordered drained every slot");
let entry = HistoryEntry::tool(
tool_output_text(&out),
tc.id.clone(),
&tc.function.name,
);
history.push(entry.clone());
tool_results.push(entry);
}
yield AgentEvent::ToolCallsComplete;
}
if let Some(threshold) = self.config.compact_threshold
&& Self::estimate_tokens(history) > threshold
{
if let Some(summary) = self.compact(history).await {
yield AgentEvent::Compact { summary: summary.clone() };
*history = vec![HistoryEntry::user(&summary)];
yield AgentEvent::TextStart;
yield AgentEvent::TextDelta(
"\n[context compacted]\n".to_owned(),
);
yield AgentEvent::TextEnd;
}
continue;
}
let step = AgentStep {
message,
usage,
finish_reason,
tool_calls,
tool_results,
};
if !step.tool_calls.is_empty() {
steps.push(step);
} else {
let stop_reason = Self::stop_reason(&step);
steps.push(step);
yield AgentEvent::Done(AgentResponse {
final_response: content,
iterations: steps.len(),
stop_reason,
steps,
model: model_name.clone(),
});
return;
}
}
let final_response = steps
.last()
.and_then(|s| s.message.content.as_ref())
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_owned());
yield AgentEvent::Done(AgentResponse {
final_response,
iterations: steps.len(),
stop_reason: AgentStopReason::MaxIterations,
steps,
model: model_name,
});
}
}
}