use lellm_core::{ChatResponse, LlmError, Message};
use lellm_provider::ResolvedModel;
use super::config::{ToolUseConfig, ToolUseDeps, build_request_messages_inner, empty_response};
use super::context::{
CompactionResult, ContextBudget, ContextCompactor, LocalCompactor, estimate_reasoning_block,
estimate_text, estimate_tokens,
};
use super::event::{AgentEvent, AgentStream, StopReason};
use super::fallback::{FallbackAction, FallbackContext};
use super::iteration::{StreamIterResult, do_stream_iteration, emit, execute_with_fallback};
use super::tools::ToolExecutor;
#[derive(Debug, Clone)]
pub struct LoopState {
pub messages: Vec<Message>,
pub estimated_tokens: usize,
pub iterations: usize,
pub tool_calls_executed: usize,
pub total_output_tokens: usize,
pub total_reasoning_tokens: usize,
}
impl LoopState {
pub fn new(messages: Vec<Message>) -> Self {
let estimated_tokens = estimate_tokens(&messages);
Self {
messages,
estimated_tokens,
iterations: 0,
tool_calls_executed: 0,
total_output_tokens: 0,
total_reasoning_tokens: 0,
}
}
pub fn add_output_tokens(&mut self, tokens: usize) {
self.total_output_tokens += tokens;
}
pub fn add_reasoning_tokens(&mut self, tokens: usize) {
self.total_reasoning_tokens += tokens;
}
pub fn exceeded_total_output(&self, max: Option<u32>) -> bool {
match max {
Some(limit) => self.total_output_tokens >= limit as usize,
None => false,
}
}
pub fn exceeded_total_reasoning(&self, max: Option<u32>) -> bool {
match max {
Some(limit) => self.total_reasoning_tokens >= limit as usize,
None => false,
}
}
pub fn add_output_from_content(&mut self, content: &[lellm_core::ContentBlock]) {
let mut output_tokens: usize = 0;
let mut reasoning_tokens: usize = 0;
for b in content {
match b {
lellm_core::ContentBlock::Text(t) => output_tokens += estimate_text(&t.text),
lellm_core::ContentBlock::Thinking(th) => {
reasoning_tokens += estimate_reasoning_block(th)
}
lellm_core::ContentBlock::Image { .. } | lellm_core::ContentBlock::ToolCall(_) => {}
}
}
self.total_output_tokens += output_tokens;
self.total_reasoning_tokens += reasoning_tokens;
}
pub fn push_assistant(&mut self, content: Vec<lellm_core::ContentBlock>) {
let msg = Message::Assistant {
content: content.clone(),
};
let tokens = estimate_tokens(&[msg]);
self.estimated_tokens += tokens;
self.messages.push(Message::Assistant { content });
}
pub fn push_tool_results(
&mut self,
results: Vec<Message>,
budget: &ContextBudget,
) {
let results: Vec<Message> = results
.into_iter()
.map(|m| {
if let Message::ToolResult {
ref tool_call_id,
is_error: false,
ref content,
} = m
{
let truncated = budget.truncate_tool_result_blocks(content);
if truncated != *content {
return Message::ToolResult {
tool_call_id: tool_call_id.clone(),
is_error: false,
content: truncated,
};
}
}
m
})
.collect();
let tokens = estimate_tokens(&results);
self.estimated_tokens += tokens;
self.messages.extend(results);
}
pub fn add_tool_calls(&mut self, count: usize) {
self.tool_calls_executed += count;
}
pub fn next_iteration(&mut self) {
self.iterations += 1;
}
pub fn reached_max(&self, max_iterations: usize) -> bool {
self.iterations >= max_iterations
}
pub fn compact(
&mut self,
budget: &ContextBudget,
compactor: &dyn ContextCompactor,
) -> Option<CompactionResult> {
if !budget.should_compact(self.estimated_tokens) {
return None;
}
let result = compactor.compact(&self.messages, budget);
self.messages = result.messages.clone();
self.estimated_tokens = result.after_tokens;
Some(result)
}
pub fn finish(&self, stop_reason: StopReason, response: ChatResponse) -> ToolUseResult {
ToolUseResult {
stop_reason,
response,
messages: self.messages.clone(),
iterations: self.iterations,
tool_calls_executed: self.tool_calls_executed,
}
}
pub fn finish_complete(&self, response: ChatResponse) -> ToolUseResult {
self.finish(StopReason::Complete, response)
}
pub fn finish_max_iterations(&self, response: ChatResponse) -> ToolUseResult {
self.finish(StopReason::MaxIterationsReached, response)
}
pub fn finish_cancelled(&self, response: ChatResponse) -> ToolUseResult {
self.finish(StopReason::Cancelled, response)
}
pub fn finish_output_budget(&self, response: ChatResponse) -> ToolUseResult {
self.finish(StopReason::OutputBudgetExceeded, response)
}
pub fn finish_reasoning_budget(&self, response: ChatResponse) -> ToolUseResult {
self.finish(StopReason::ReasoningBudgetExceeded, response)
}
}
#[derive(Debug, Clone)]
pub struct ToolUseResult {
pub stop_reason: StopReason,
pub response: ChatResponse,
pub messages: Vec<Message>,
pub iterations: usize,
pub tool_calls_executed: usize,
}
impl ToolUseResult {
pub fn is_success(&self) -> bool {
matches!(self.stop_reason, StopReason::Complete)
}
}
#[derive(Clone)]
pub struct ToolUseLoop {
model: ResolvedModel,
executor: ToolExecutor,
config: ToolUseConfig,
deps: ToolUseDeps,
}
impl ToolUseLoop {
pub fn new(
model: ResolvedModel,
executor: ToolExecutor,
config: ToolUseConfig,
deps: ToolUseDeps,
) -> Self {
if config.stream_thinking {
let caps = model.provider.capabilities_for(&model.model);
if !caps.supports_stream_thinking {
tracing::warn!(
provider = %model.provider.provider_id(),
model = %model.model,
"stream_thinking=true but provider does not support thinking deltas; \
reasoning content will only be available in the final response"
);
}
}
Self {
model,
executor,
config,
deps,
}
}
pub fn simple(model: ResolvedModel, executor: ToolExecutor) -> Self {
Self::new(
model,
executor,
ToolUseConfig::default(),
ToolUseDeps::default(),
)
}
pub async fn execute(&self, messages: Vec<Message>) -> Result<ToolUseResult, LlmError> {
let initial_messages = build_request_messages_inner(&self.config, &messages)?;
let mut state = LoopState::new(initial_messages);
let mut last_response: Option<ChatResponse> = None;
let compactor: Box<dyn ContextCompactor> = Box::new(LocalCompactor::new());
loop {
if state.reached_max(self.config.max_iterations) {
return Ok(
state.finish_max_iterations(last_response.unwrap_or_else(empty_response))
);
}
state.next_iteration();
state.compact(&self.config.context_budget, &*compactor);
let req = super::config::build_request_inner_with_round(
&self.model,
&self.executor,
&state.messages,
self.config.max_output_tokens,
&self.config.request_options,
state.iterations,
);
let iteration = state.iterations;
let msg_snapshot = state.messages.clone();
let response = execute_with_fallback(
&self.deps.fallback,
|| self.model.provider.call(&req),
iteration,
&msg_snapshot,
)
.await?;
last_response = Some(response.clone());
if let Some(limit) = self.config.request_options.max_reasoning_tokens {
let round_reasoning: usize = response
.content
.iter()
.filter_map(|b| match b {
lellm_core::ContentBlock::Thinking(th) => Some(estimate_reasoning_block(th)),
_ => None,
})
.sum();
if round_reasoning > limit as usize {
tracing::warn!(
round_reasoning,
max_reasoning_tokens = limit,
"single-round reasoning budget exceeded (non-stream, soft limit)"
);
return Ok(state.finish_reasoning_budget(response));
}
}
state.add_output_from_content(&response.content);
if state.exceeded_total_output(self.config.max_total_output_tokens) {
return Ok(state.finish_output_budget(response));
}
if state.exceeded_total_reasoning(self.config.max_total_reasoning_tokens) {
return Ok(state.finish_reasoning_budget(response));
}
if !response.has_tool_calls() {
return Ok(state.finish_complete(response));
}
let tool_calls: Vec<_> = response.tool_calls().cloned().collect();
state.push_assistant(response.content.clone());
state.add_tool_calls(tool_calls.len());
let batch = self.executor.execute_batch(&tool_calls).await;
if batch.panicked {
tracing::warn!("tool batch task panicked — error results filled in by executor");
}
state.push_tool_results(batch.results, &self.config.context_budget);
tracing::debug!(
iteration = state.iterations,
tool_calls = tool_calls.len(),
"tool-use loop iteration"
);
}
}
pub fn execute_stream(&self, messages: Vec<Message>) -> AgentStream {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let model = self.model.clone();
let executor = self.executor.clone();
let config = self.config.clone();
let deps = self.deps.clone();
tokio::spawn(async move {
let initial_messages = match build_request_messages_inner(&config, &messages) {
Ok(m) => m,
Err(e) => {
let _ = tokio::sync::mpsc::Sender::send(
&tx,
AgentEvent::LoopError {
error: e,
iterations: 0,
},
)
.await;
return;
}
};
let mut state = LoopState::new(initial_messages);
let mut last_response: Option<ChatResponse> = None;
let compactor: Box<dyn ContextCompactor> = Box::new(LocalCompactor::new());
loop {
if state.reached_max(config.max_iterations) {
let _ = emit(
&tx,
AgentEvent::LoopEnd {
result: state.finish_max_iterations(
last_response.unwrap_or_else(empty_response),
),
},
)
.await;
return;
}
state.next_iteration();
if let Some(compact_result) = state.compact(&config.context_budget, &*compactor) {
let _ = emit(
&tx,
AgentEvent::ContextCompacted {
before_tokens: compact_result.before_tokens,
after_tokens: compact_result.after_tokens,
removed_messages: compact_result.removed_messages,
},
)
.await;
}
let req = super::config::build_request_inner_with_round(
&model,
&executor,
&state.messages,
config.max_output_tokens,
&config.request_options,
state.iterations,
);
let iteration = state.iterations;
let attempt_state = state.clone();
let mut attempt: usize = 1;
let result = loop {
let iter_result = do_stream_iteration(
model.clone(),
tx.clone(),
executor.clone(),
attempt_state.clone(),
req.clone(),
config.context_budget.clone(),
config.max_output_tokens,
config.stream_thinking,
)
.await;
match iter_result.result {
Ok(v) => break Ok(v),
Err(ref err) => {
tracing::warn!(
attempt = attempt,
error = %err,
stream_started = iter_result.stream_started,
"stream iteration failed, fallback handling"
);
if iter_result.stream_started {
let e: LlmError = err.clone();
break Err(e);
}
let ctx = FallbackContext {
error: err,
attempt,
iterations: iteration,
conversation: std::sync::Arc::from(
attempt_state.messages.as_slice(),
),
};
match deps.fallback.handle(&ctx).await {
FallbackAction::Retry => {
attempt += 1;
}
FallbackAction::Abort => {
break Err(err.clone());
}
}
}
}
};
let result = match result {
Ok((r, s)) => {
state = s;
Ok(r)
}
Err(e) => Err(e),
};
if state.exceeded_total_output(config.max_total_output_tokens) {
let _ = emit(
&tx,
AgentEvent::LoopEnd {
result: state.finish_output_budget(last_response.unwrap_or_else(empty_response)),
},
)
.await;
return;
}
if state.exceeded_total_reasoning(config.max_total_reasoning_tokens) {
let _ = emit(
&tx,
AgentEvent::LoopEnd {
result: state.finish_reasoning_budget(last_response.unwrap_or_else(empty_response)),
},
)
.await;
return;
}
match result {
Ok(StreamIterResult::Continue { response, .. }) => {
last_response = Some(response);
}
Ok(StreamIterResult::Complete { response }) => {
let _ = emit(
&tx,
AgentEvent::LoopEnd {
result: state.finish_complete(response),
},
)
.await;
return;
}
Ok(StreamIterResult::Cancelled { response }) => {
let resp = response
.or(last_response.take())
.unwrap_or_else(empty_response);
let _ = emit(
&tx,
AgentEvent::LoopEnd {
result: state.finish_cancelled(resp),
},
)
.await;
return;
}
Ok(StreamIterResult::OutputBudgetExceeded { response }) => {
tracing::warn!(
total_output_tokens = state.total_output_tokens,
"single-round output budget exceeded, stopping agent"
);
let _ = emit(
&tx,
AgentEvent::LoopEnd {
result: state.finish_output_budget(response),
},
)
.await;
return;
}
Ok(StreamIterResult::ReasoningBudgetExceeded { response }) => {
tracing::warn!(
total_reasoning_tokens = state.total_reasoning_tokens,
"single-round reasoning budget exceeded, stopping agent"
);
let _ = emit(
&tx,
AgentEvent::LoopEnd {
result: state.finish_reasoning_budget(response),
},
)
.await;
return;
}
Err(e) => {
let _ = emit(
&tx,
AgentEvent::LoopError {
error: e,
iterations: state.iterations,
},
)
.await;
return;
}
}
}
});
rx
}
}