use lellm_core::{ChatRequest, ChatResponse, LlmError, Message};
use lellm_provider::ResolvedModel;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use super::LoopState;
use super::context::{ContextBudget, estimate_text};
use super::event::AgentEvent;
use super::fallback::{FallbackAction, FallbackContext, FallbackStrategy};
use super::tools::ToolExecutor;
pub async fn execute_with_fallback<T, F, Fut>(
fallback: &Arc<dyn FallbackStrategy>,
mut op: F,
iteration: usize,
messages: &[Message],
) -> Result<T, LlmError>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, LlmError>>,
{
let mut attempt: usize = 1;
loop {
match op().await {
Ok(v) => return Ok(v),
Err(err) => {
tracing::warn!(
attempt = attempt,
error = %err,
"provider operation failed, fallback handling"
);
let ctx = FallbackContext {
error: &err,
attempt,
iterations: iteration,
conversation: messages.to_vec().into(),
};
match fallback.handle(&ctx).await {
FallbackAction::Retry => {
attempt += 1;
}
FallbackAction::Abort => {
return Err(err);
}
}
}
}
}
}
pub async fn emit(tx: &Sender<AgentEvent>, event: AgentEvent) -> bool {
tx.send(event).await.is_ok()
}
pub async fn emit_and_execute_tools(
tx: &Sender<AgentEvent>,
executor: &ToolExecutor,
tool_calls: &[lellm_core::ToolCall],
) -> Option<Vec<Message>> {
let mut results = Vec::new();
for tc in tool_calls {
if !emit(
tx,
AgentEvent::ToolStart {
tool_call_id: tc.id.clone(),
name: tc.name.clone(),
},
)
.await
{
return None;
}
let raw_result = executor.execute_with_emission(tc, tx).await;
if !emit(
tx,
AgentEvent::ToolEnd {
tool_call_id: tc.id.clone(),
result: raw_result.clone(),
},
)
.await
{
return None;
}
results.push(Message::tool_result(tc, &raw_result));
}
Some(results)
}
pub fn build_partial_response(
text_buffer: String,
thinking_buffer: String,
redacted_buffer: Option<String>,
) -> ChatResponse {
let mut content: Vec<lellm_core::ContentBlock> = Vec::new();
if !thinking_buffer.is_empty() {
content.push(lellm_core::ContentBlock::Thinking(
lellm_core::ThinkingBlock {
thinking: thinking_buffer,
redacted: redacted_buffer,
},
));
}
if !text_buffer.is_empty() {
content.push(lellm_core::ContentBlock::Text(lellm_core::TextBlock {
text: text_buffer,
}));
}
if content.is_empty() {
content.push(lellm_core::ContentBlock::Text(lellm_core::TextBlock {
text: String::new(),
}));
}
ChatResponse::new(
content,
lellm_core::TokenUsage::default(),
serde_json::json!(null),
)
}
#[must_use]
pub(super) enum StreamIterResult {
Continue { response: ChatResponse },
Complete { response: ChatResponse },
Cancelled { response: Option<ChatResponse> },
OutputBudgetExceeded { response: ChatResponse },
ReasoningBudgetExceeded { response: ChatResponse },
}
async fn process_stream_iteration(
tx: &Sender<AgentEvent>,
executor: &ToolExecutor,
state: &mut LoopState,
stream: &mut lellm_provider::ProviderStream,
text_buffer: &mut String,
thinking_buffer: &mut String,
redacted_buffer: &mut Option<String>,
budget: &ContextBudget,
max_output_tokens: u32,
max_reasoning_tokens: Option<u32>,
stream_thinking: bool,
) -> Result<StreamIterResult, LlmError> {
use futures_util::StreamExt;
let mut round_output_tokens: usize = 0;
let mut round_reasoning_tokens: usize = 0;
while let Some(result) = stream.next().await {
let ev = match result {
Ok(ev) => ev,
Err(e) => return Err(e),
};
match &ev {
lellm_provider::ProviderEvent::Token { token } => {
round_output_tokens += estimate_text(token);
if (round_output_tokens as u32) > max_output_tokens {
tracing::warn!(
round_output_tokens,
max_output_tokens,
"single-round output budget exceeded, cutting stream"
);
let response = build_partial_response(
text_buffer.clone(),
thinking_buffer.clone(),
redacted_buffer.clone(),
);
return Ok(StreamIterResult::OutputBudgetExceeded { response });
}
text_buffer.push_str(token);
}
lellm_provider::ProviderEvent::ThinkingDelta { thinking, redacted } => {
round_reasoning_tokens += estimate_text(thinking)
+ redacted.as_ref().map(|r| estimate_text(r)).unwrap_or(0);
if let Some(limit) = max_reasoning_tokens {
if (round_reasoning_tokens as u32) > limit {
tracing::warn!(
round_reasoning_tokens,
max_reasoning_tokens = limit,
"single-round reasoning budget exceeded, cutting stream"
);
let response = build_partial_response(
text_buffer.clone(),
thinking_buffer.clone(),
redacted_buffer.clone(),
);
return Ok(StreamIterResult::ReasoningBudgetExceeded { response });
}
}
thinking_buffer.push_str(thinking);
if let Some(r) = redacted {
if let Some(ref mut prev) = *redacted_buffer {
prev.push_str(r);
} else {
*redacted_buffer = Some(r.clone());
}
}
}
lellm_provider::ProviderEvent::Start { .. }
| lellm_provider::ProviderEvent::ResponseComplete { .. } => {}
}
if matches!(&ev, lellm_provider::ProviderEvent::ThinkingDelta { .. })
&& !stream_thinking
{
} else if !emit(tx, AgentEvent::Provider(ev.clone())).await {
return Ok(StreamIterResult::Cancelled { response: None });
}
if let lellm_provider::ProviderEvent::ResponseComplete { tool_calls, usage } = ev {
let pending_tool_calls = tool_calls;
let usage_val = usage.unwrap_or_default();
let mut content: Vec<lellm_core::ContentBlock> = Vec::new();
if !thinking_buffer.is_empty() {
content.push(lellm_core::ContentBlock::Thinking(
lellm_core::ThinkingBlock {
thinking: thinking_buffer.clone(),
redacted: redacted_buffer.clone(),
},
));
}
if !text_buffer.is_empty() {
content.push(lellm_core::ContentBlock::Text(lellm_core::TextBlock {
text: text_buffer.clone(),
}));
}
content.extend(
pending_tool_calls
.iter()
.map(|tc| lellm_core::ContentBlock::ToolCall(tc.clone())),
);
let response = ChatResponse::new(content, usage_val, serde_json::json!(null));
if !pending_tool_calls.is_empty() {
state.push_assistant(response.content.clone());
state.add_output_from_content(&response.content);
state.add_tool_calls(pending_tool_calls.len());
let results =
emit_and_execute_tools(tx, executor, &pending_tool_calls).await;
if results.is_none() {
return Ok(StreamIterResult::Cancelled {
response: Some(response),
});
}
state.push_tool_results(results.unwrap(), budget);
tracing::debug!(
iteration = state.iterations,
tool_calls = pending_tool_calls.len(),
"tool-use stream iteration"
);
return Ok(StreamIterResult::Continue { response });
} else {
state.add_output_from_content(&response.content);
if !emit(
tx,
AgentEvent::LoopEnd {
result: state.finish_complete(response.clone()),
},
)
.await
{
return Ok(StreamIterResult::Cancelled {
response: Some(response),
});
}
return Ok(StreamIterResult::Complete { response });
}
}
}
Err(LlmError::UnexpectedEof)
}
pub(super) struct StreamIterationResult {
pub(super) result: Result<(StreamIterResult, LoopState), LlmError>,
pub(super) stream_started: bool,
}
pub(super) async fn do_stream_iteration(
model: ResolvedModel,
tx: Sender<AgentEvent>,
executor: ToolExecutor,
state: LoopState,
req: ChatRequest,
budget: ContextBudget,
max_output_tokens: u32,
stream_thinking: bool,
) -> StreamIterationResult {
let max_reasoning_tokens = req.max_reasoning_tokens;
let mut stream = match model.provider.stream(&req).await {
Ok(s) => s,
Err(e) => {
return StreamIterationResult {
result: Err(e),
stream_started: false,
};
}
};
let mut text_buffer = String::new();
let mut thinking_buffer = String::new();
let mut redacted_buffer: Option<String> = None;
let mut attempt_state = state;
let iter_result = process_stream_iteration(
&tx,
&executor,
&mut attempt_state,
&mut stream,
&mut text_buffer,
&mut thinking_buffer,
&mut redacted_buffer,
&budget,
max_output_tokens,
max_reasoning_tokens,
stream_thinking,
)
.await;
StreamIterationResult {
result: iter_result.map(|r| (r, attempt_state)),
stream_started: true,
}
}