use anyhow::Result;
use crossterm::event;
use ratatui::{Terminal, backend::CrosstermBackend};
use std::io;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::mpsc;
use super::state::GenerationStatus;
use super::stream_event::StreamEvent;
use crate::agents::{
AgentAction, SubagentProgress, SubagentResult, collect_subagent_results,
format_subagent_tool_result, spawn_subagents,
};
use crate::constants::UI_POLL_INTERVAL_MS;
use crate::models::{ChatMessage, MessageRole, StreamCallback};
use crate::runtime::agent_loop::{MAX_AGENT_ITERATIONS, ToolExecutionResult};
use crate::tui::App;
use crate::tui::render::render_ui;
use super::action_handler;
use super::command_handler;
use super::event_handler::{EventAction, handle_event};
use super::stream_handler::{StreamStatus, process_stream_chunks};
fn spawn_model_call(app: &mut App, messages: Vec<ChatMessage>, tx: &mpsc::Sender<StreamEvent>) {
app.operation_state.accumulated_tool_calls.clear();
app.clear_response();
let model = app.model_state.model.clone();
let tx_stream = tx.clone();
let tx_done = tx.clone();
let config = app.build_model_config();
let handle = tokio::spawn(async move {
let sent_chunks = Arc::new(std::sync::atomic::AtomicBool::new(false));
let sent_flag = Arc::clone(&sent_chunks);
let callback: StreamCallback = Arc::new(move |chunk| {
sent_flag.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = tx_stream.try_send(StreamEvent::Chunk(chunk.to_string()));
});
let model = model.read().await;
match model.chat(&messages, &config, Some(callback)).await {
Ok(response) => {
if !sent_chunks.load(std::sync::atomic::Ordering::Relaxed)
&& !response.content.is_empty()
{
let _ = tx_done
.send(StreamEvent::Chunk(response.content.clone()))
.await;
}
if let Some(ref tool_calls) = response.tool_calls
&& !tool_calls.is_empty()
{
let _ = tx_done
.send(StreamEvent::ToolCalls(tool_calls.clone()))
.await;
}
let tokens = response.usage.map(|u| u.total_tokens).unwrap_or(0);
let _ = tx_done
.send(StreamEvent::Done {
total_tokens: tokens,
})
.await;
},
Err(e) => {
let _ = tx_done.send(StreamEvent::Error(e.to_user_facing())).await;
},
}
});
if app.app_state.is_generating() {
app.update_abort_handle(handle.abort_handle());
app.transition_to_sending();
} else {
app.start_generation(handle.abort_handle());
}
}
pub async fn run_app_loop(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
app: &mut App,
tx: mpsc::Sender<StreamEvent>,
rx: &mut mpsc::Receiver<StreamEvent>,
) -> Result<()> {
let mut title_task: Option<tokio::task::JoinHandle<Option<String>>> = None;
loop {
if let Some(ref task) = title_task
&& task.is_finished()
&& let Some(task) = title_task.take()
&& let Ok(Some(title)) = task.await
{
app.session_state.conversation_title = Some(title);
}
app.poll_mcp_init().await;
let viewport_height = terminal.size()?.height.saturating_sub(8);
terminal.draw(|f| render_ui(f, app))?;
if app.app_state.generation_status() == Some(GenerationStatus::Sending)
&& let Some(start_time) = app.app_state.generation_start_time()
&& start_time.elapsed().as_secs() >= 1
{
app.transition_to_thinking();
}
if event::poll(std::time::Duration::from_millis(UI_POLL_INTERVAL_MS))? {
let event = event::read()?;
match handle_event(app, event, viewport_height)? {
EventAction::Continue => {
},
EventAction::Quit => {
break;
},
EventAction::SubmitMessage(input) => {
handle_message_submit(app, input, &tx, viewport_height).await;
},
EventAction::ExecuteCommand(command) => {
command_handler::handle_command(app, &command).await?;
},
}
}
match process_stream_chunks(app, rx).await? {
StreamStatus::Streaming => {
},
StreamStatus::Complete { tool_calls } => {
if !tool_calls.is_empty() {
run_agent_loop(app, tool_calls, &tx, rx, terminal).await?;
}
'queue_loop: while app.operation_state.has_queued_message() {
if let Some(queued_msg) = app.operation_state.take_queued_message() {
handle_message_submit(app, queued_msg, &tx, viewport_height).await;
loop {
if render_and_check_interrupt(terminal, app)? {
let cleared = app.operation_state.queued_message_count();
while app.operation_state.take_queued_message().is_some() {}
if cleared > 0 {
app.set_status(format!(
"Interrupted - cleared {} queued message(s)",
cleared
));
}
break 'queue_loop;
}
match process_stream_chunks(app, rx).await? {
StreamStatus::Streaming => {
},
StreamStatus::Complete {
tool_calls: new_tool_calls,
} => {
if !new_tool_calls.is_empty() {
run_agent_loop(app, new_tool_calls, &tx, rx, terminal)
.await?;
}
break; },
StreamStatus::Error(error) => {
app.display_error(&error.summary, &error.message);
app.stop_generation();
break 'queue_loop;
},
}
}
}
}
app.stop_generation();
if title_task.is_none() {
title_task = app.spawn_title_generation();
}
},
StreamStatus::Error(_error) => {
app.stop_generation();
},
}
if !app.running {
break;
}
}
Ok(())
}
async fn handle_message_submit(
app: &mut App,
input: String,
tx: &mpsc::Sender<StreamEvent>,
_viewport_height: u16,
) {
let images = app.attachment_state.take_base64_data();
app.add_message_with_images(MessageRole::User, input.clone(), images);
let messages = app.build_managed_message_history(
crate::constants::MAX_CONTEXT_TOKENS,
crate::constants::CONTEXT_RESERVE_TOKENS,
);
app.input.history.push_back(input.clone());
app.input.history_index = None;
app.input.history_buffer.clear();
if let Some(ref mut conv) = app.session_state.current_conversation {
conv.add_to_input_history(input.clone());
if let Some(ref manager) = app.session_state.conversation_manager {
let conv_clone = conv.clone();
let manager_clone = manager.clone();
tokio::task::spawn_blocking(move || {
let _ = manager_clone.save_conversation(&conv_clone);
});
}
}
spawn_model_call(app, messages, tx);
}
fn render_and_check_interrupt(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
app: &mut App,
) -> Result<bool> {
terminal.draw(|f| render_ui(f, app))?;
while event::poll(Duration::from_millis(0))? {
match event::read()? {
event::Event::Key(key) if key.kind == crossterm::event::KeyEventKind::Press => {
match key.code {
crossterm::event::KeyCode::Esc => {
let (abort, partial) = app.abort_generation();
if let Some(h) = abort {
h.abort();
}
if !partial.is_empty() {
app.add_message(MessageRole::Assistant, partial);
}
app.set_status("Agent loop interrupted");
return Ok(true);
},
crossterm::event::KeyCode::Char('c')
if key
.modifiers
.contains(crossterm::event::KeyModifiers::CONTROL) =>
{
let (abort, partial) = app.abort_generation();
if let Some(h) = abort {
h.abort();
}
if !partial.is_empty() {
app.add_message(MessageRole::Assistant, partial);
}
app.set_status("Agent loop interrupted");
return Ok(true);
},
crossterm::event::KeyCode::Char(c) => {
if key.modifiers.is_empty()
|| key.modifiers == crossterm::event::KeyModifiers::SHIFT
{
app.input.insert(c);
}
},
crossterm::event::KeyCode::Enter => {
if !app.input.is_empty() && !app.input.get().starts_with(':') {
let input = app.input.get().to_string();
app.operation_state.queue_message(input);
app.clear_input();
app.set_status("Message queued");
}
},
crossterm::event::KeyCode::Backspace => {
app.input.backspace();
},
crossterm::event::KeyCode::Delete => {
app.input.delete();
},
crossterm::event::KeyCode::Left => {
app.input.move_left();
},
crossterm::event::KeyCode::Right => {
app.input.move_right();
},
crossterm::event::KeyCode::Home => {
app.input.move_home();
},
crossterm::event::KeyCode::End => {
app.input.move_end();
},
crossterm::event::KeyCode::PageUp => {
app.scroll_up(10);
},
crossterm::event::KeyCode::PageDown => {
app.scroll_down(10);
},
_ => {},
}
},
event::Event::Mouse(mouse) => match mouse.kind {
crossterm::event::MouseEventKind::ScrollUp => {
app.scroll_up(crate::constants::UI_MOUSE_SCROLL_LINES);
},
crossterm::event::MouseEventKind::ScrollDown => {
app.scroll_down(crate::constants::UI_MOUSE_SCROLL_LINES);
},
_ => {},
},
event::Event::Paste(text) => {
let cleaned = text.replace('\r', "");
if !cleaned.is_empty() {
app.input.insert_str(&cleaned);
}
},
_ => {},
}
}
Ok(false)
}
async fn run_agent_loop(
app: &mut App,
initial_tool_calls: Vec<crate::models::ToolCall>,
tx: &mpsc::Sender<StreamEvent>,
rx: &mut mpsc::Receiver<StreamEvent>,
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
) -> Result<()> {
let mut current_tool_calls = initial_tool_calls;
let mut iteration = 0;
while !current_tool_calls.is_empty() {
iteration += 1;
if iteration > MAX_AGENT_ITERATIONS {
app.set_status(format!(
"Agent loop exceeded {} iterations",
MAX_AGENT_ITERATIONS
));
return Ok(());
}
app.set_status(format!("Agent loop iteration {}", iteration));
app.poll_mcp_init().await;
if render_and_check_interrupt(terminal, app)? {
return Ok(());
}
if let Some(queued_msg) = app.operation_state.take_queued_message() {
app.set_status("Processing queued message...");
app.add_message(MessageRole::User, queued_msg.clone());
app.input.history.push_back(queued_msg);
current_tool_calls.clear();
let messages = app.build_managed_message_history(
crate::constants::MAX_CONTEXT_TOKENS,
crate::constants::CONTEXT_RESERVE_TOKENS,
);
app.clear_response();
spawn_model_call(app, messages, tx);
loop {
if render_and_check_interrupt(terminal, app)? {
return Ok(());
}
match process_stream_chunks(app, rx).await? {
StreamStatus::Streaming => {},
StreamStatus::Complete {
tool_calls: new_tool_calls,
} => {
if !new_tool_calls.is_empty() {
current_tool_calls = new_tool_calls;
}
break;
},
StreamStatus::Error(error) => {
app.display_error(&error.summary, &error.message);
return Ok(());
},
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
continue;
}
let (regular_calls, agent_calls): (Vec<&_>, Vec<&_>) = current_tool_calls
.iter()
.partition(|tc| tc.function.name != "agent");
let regular_owned: Vec<_> = regular_calls.into_iter().cloned().collect();
let mut results = if !regular_owned.is_empty() {
action_handler::execute_tool_calls_for_agent_loop(app, ®ular_owned).await
} else {
Vec::new()
};
if !agent_calls.is_empty() {
let agent_specs: Vec<(String, String)> = agent_calls
.iter()
.filter_map(|tc| match tc.to_agent_action() {
Ok(AgentAction::SpawnAgent {
prompt,
description,
}) => Some((prompt, description)),
_ => None,
})
.collect();
if !agent_specs.is_empty() {
let progress = Arc::new(Mutex::new(Vec::<SubagentProgress>::new()));
let config = app.build_model_config();
app.operation_state.active_subagents = Some(Arc::clone(&progress));
let (handles, overflow) = spawn_subagents(
agent_specs,
Arc::clone(&app.model_state.model),
&config,
Arc::clone(&progress),
);
let subagent_results =
poll_subagent_handles(handles, overflow, terminal, app).await?;
if subagent_results.is_empty() {
return Ok(());
}
for (i, agent_result) in subagent_results.iter().enumerate() {
let action_display =
action_handler::build_agent_action_display(agent_result);
if let Some(last_msg) = app
.session_state
.messages
.iter_mut()
.rev()
.find(|m| matches!(m.role, MessageRole::Assistant))
{
last_msg.actions.push(action_display);
}
let tool_call_id = agent_calls
.get(i)
.and_then(|tc| tc.id.clone())
.unwrap_or_else(|| format!("call_agent_{}", i));
let output = format_subagent_tool_result(agent_result);
results.push(ToolExecutionResult {
tool_call_id,
tool_name: "agent".to_string(),
action: AgentAction::SpawnAgent {
prompt: String::new(),
description: agent_result.description.clone(),
},
success: agent_result.success,
output,
images: None,
});
app.session_state.cumulative_tokens += agent_result.tokens;
}
}
}
if let Some(last_assistant) = app
.session_state
.messages
.iter_mut()
.rev()
.find(|m| matches!(m.role, MessageRole::Assistant))
{
last_assistant.tool_calls = Some(current_tool_calls.clone());
}
for result in &results {
if let Some(ref imgs) = result.images {
let mut tool_msg = crate::models::ChatMessage::tool(
&result.tool_call_id,
&result.tool_name,
&result.output,
);
tool_msg = tool_msg.with_images(imgs.clone());
app.commit_message(tool_msg);
} else {
app.add_tool_result(
result.tool_call_id.clone(),
result.tool_name.clone(),
result.output.clone(),
);
}
}
app.set_status(format!(
"Iteration {} - {} tool(s) executed, calling model...",
iteration,
results.len()
));
if render_and_check_interrupt(terminal, app)? {
return Ok(());
}
let messages = app.build_managed_message_history(
crate::constants::MAX_CONTEXT_TOKENS,
crate::constants::CONTEXT_RESERVE_TOKENS,
);
app.clear_response();
spawn_model_call(app, messages, tx);
loop {
if render_and_check_interrupt(terminal, app)? {
return Ok(());
}
match process_stream_chunks(app, rx).await? {
StreamStatus::Streaming => {
},
StreamStatus::Complete {
tool_calls: new_tool_calls,
} => {
if new_tool_calls.is_empty() {
app.set_status(format!(
"Agent loop complete after {} iterations",
iteration
));
return Ok(());
} else {
current_tool_calls = new_tool_calls;
break; }
},
StreamStatus::Error(error) => {
app.display_error(&error.summary, &error.message);
return Ok(());
},
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
Ok(())
}
async fn poll_subagent_handles(
handles: Vec<tokio::task::JoinHandle<SubagentResult>>,
overflow: Vec<SubagentResult>,
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
app: &mut App,
) -> Result<Vec<SubagentResult>> {
let handle_count = handles.len();
app.set_status(format!("Running {} agent(s)...", handle_count));
loop {
if let Some(ref progress) = app.operation_state.active_subagents {
let total_tokens: usize = progress
.lock()
.map(|p| p.iter().map(|a| a.tokens).sum())
.unwrap_or(0);
if let super::state::AppState::Generating {
tokens_received, ..
} = &mut app.app_state
{
*tokens_received = total_tokens;
}
}
if render_and_check_interrupt(terminal, app)? {
for h in &handles {
h.abort();
}
app.operation_state.clear_subagents();
return Ok(Vec::new());
}
if handles.iter().all(|h| h.is_finished()) {
break;
}
tokio::time::sleep(Duration::from_millis(UI_POLL_INTERVAL_MS)).await;
}
let results = collect_subagent_results(handles, overflow).await;
app.set_status(format!("{} agent(s) completed", results.len()));
app.operation_state.clear_subagents();
Ok(results)
}