use anyhow::Result;
use async_trait::async_trait;
use crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers, MouseEventKind};
use ratatui::{Terminal, backend::CrosstermBackend};
use std::io;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{RwLock, mpsc};
use super::state::{AppState, GenerationStatus};
use super::tui_stream_event::TuiStreamEvent;
use crate::agents::{
ActionResult as AgentActionResult, AgentAction, SubagentProgress, SubagentResult,
collect_subagent_results, spawn_subagents,
};
use crate::constants::{UI_MOUSE_SCROLL_LINES, UI_POLL_INTERVAL_MS};
use crate::models::{
ChatMessage, ErrorCategory, MessageRole, Model, ModelConfig, StreamCallback, StreamEvent,
};
use crate::runtime::agent_loop::{
AgentObserver, LoopControl, MAX_AGENT_ITERATIONS, ModelCallOutput, run_agent_loop,
};
use crate::tui::App;
use crate::tui::render::render_ui;
use super::action_handler;
use super::command_handler;
use super::event_handler::{EventAction, handle_event};
pub struct TuiObserver<'a> {
pub app: &'a mut App,
pub terminal: &'a mut Terminal<CrosstermBackend<io::Stdout>>,
pub tx: mpsc::Sender<TuiStreamEvent>,
pub rx: &'a mut mpsc::Receiver<TuiStreamEvent>,
}
impl<'a> TuiObserver<'a> {
fn render(&mut self) -> Result<()> {
self.terminal.draw(|f| render_ui(f, self.app))?;
Ok(())
}
fn drain_events(&mut self) -> Result<bool> {
while event::poll(Duration::from_millis(0))? {
match event::read()? {
Event::Key(key) if key.kind == KeyEventKind::Press => match key.code {
KeyCode::Esc => return Ok(true),
KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => {
return Ok(true);
},
KeyCode::Char(c)
if key.modifiers.is_empty() || key.modifiers == KeyModifiers::SHIFT =>
{
self.app.input.insert(c);
},
KeyCode::Enter => {
if !self.app.input.is_empty() && !self.app.input.get().starts_with('/') {
let input = self.app.input.get().to_string();
self.app.operation_state.queue_message(input);
self.app.clear_input();
self.app.set_status("Message queued");
}
},
KeyCode::Backspace => {
self.app.input.backspace();
},
KeyCode::Delete => {
self.app.input.delete();
},
KeyCode::Left => self.app.input.move_left(),
KeyCode::Right => self.app.input.move_right(),
KeyCode::Home => self.app.input.move_home(),
KeyCode::End => self.app.input.move_end(),
KeyCode::PageUp => self.app.scroll_up(10),
KeyCode::PageDown => self.app.scroll_down(10),
_ => {},
},
Event::Mouse(m) => match m.kind {
MouseEventKind::ScrollUp => self.app.scroll_up(UI_MOUSE_SCROLL_LINES),
MouseEventKind::ScrollDown => self.app.scroll_down(UI_MOUSE_SCROLL_LINES),
_ => {},
},
Event::Paste(text) => {
let cleaned = text.replace('\r', "");
if !cleaned.is_empty() {
self.app.input.insert_str(&cleaned);
}
},
_ => {},
}
}
Ok(false)
}
fn commit_partial_response(&mut self) {
let partial = self.app.take_response();
if !partial.is_empty() {
self.app.add_message(MessageRole::Assistant, partial);
}
}
fn handle_stream_error(&mut self, user_error: &crate::models::UserFacingError) {
self.app.clear_response();
if user_error.message.contains("does not support thinking") {
self.app
.model_state
.set_reasoning(crate::models::ReasoningLevel::None);
let _ = crate::app::persist_reasoning_for_model(
&self.app.model_state.model_id,
crate::models::ReasoningLevel::None,
);
self.app
.set_status("Model does not support thinking — reasoning set to none");
self.app.add_message(
MessageRole::System,
"This model does not support thinking mode. Reasoning has been set to `none`. Please try your request again."
.to_string(),
);
return;
}
let lower = user_error.message.to_lowercase();
if lower.contains("does not support images")
|| lower.contains("images not supported")
|| lower.contains("does not support vision")
|| lower.contains("is not a multimodal model")
|| lower.contains("unsupported content type")
{
self.app.model_state.vision_supported = Some(false);
for msg in &mut self.app.session_state.messages {
msg.images = None;
}
self.app
.set_status("Model does not support images - disabled");
self.app.add_message(
MessageRole::System,
"This model does not support images. Image paste has been disabled for this session.".to_string()
);
return;
}
let status_prefix = match user_error.category {
ErrorCategory::Connection => "Connection",
ErrorCategory::Auth => "Auth",
ErrorCategory::Config => "Config",
ErrorCategory::NotFound => "Not Found",
ErrorCategory::Temporary => "Temporary",
ErrorCategory::Internal => "Error",
};
self.app
.set_status(format!("[{}] {}", status_prefix, user_error.summary));
let error_display = format!(
"{}\n\nSuggestion: {}",
user_error.message, user_error.suggestion
);
self.app.add_message(MessageRole::System, error_display);
}
}
#[async_trait]
impl<'a> AgentObserver for TuiObserver<'a> {
fn check_interrupt(&mut self) -> LoopControl {
let _ = self.render();
if self.app.app_state.generation_status() == Some(GenerationStatus::Sending)
&& let Some(start_time) = self.app.app_state.generation_start_time()
&& start_time.elapsed().as_secs() >= 1
{
self.app.transition_to_thinking();
}
match self.drain_events() {
Ok(true) => {
self.commit_partial_response();
self.app.set_status("Agent loop interrupted");
LoopControl::Interrupt
},
Ok(false) => {
if let Some(queued) = self.app.operation_state.take_queued_message() {
self.app.set_status("Processing queued message...");
LoopControl::InjectMessage(queued)
} else {
LoopControl::Continue
}
},
Err(_) => LoopControl::Continue,
}
}
fn on_status(&mut self, message: &str) {
self.app.set_status(message);
let _ = self.render();
}
fn on_tool_result(
&mut self,
tool_name: &str,
_tool_call_id: &str,
action: &AgentAction,
result: &AgentActionResult,
) {
if tool_name == "agent" {
return;
}
let action_display = match result {
AgentActionResult::Success { output, .. } => {
action_handler::build_action_display(action, output)
},
AgentActionResult::Error { error } => {
action_handler::build_error_display(action, error)
},
};
if let Some(last_msg) = self
.app
.session_state
.messages
.iter_mut()
.rev()
.find(|m| matches!(m.role, MessageRole::Assistant))
{
last_msg.actions.push(action_display);
}
let _ = self.render();
}
fn on_error(&mut self, error: &str) {
self.app.display_error_simple(error);
let _ = self.render();
}
fn on_generation_start(&mut self) {
}
fn on_generation_complete(&mut self, tokens: usize) {
self.app.set_final_tokens(tokens);
}
fn on_message_appended(&mut self, msg: &ChatMessage) {
self.app.commit_message(msg.clone());
}
async fn call_model(
&mut self,
model: Arc<RwLock<Box<dyn Model>>>,
_messages: &[ChatMessage],
config: &ModelConfig,
) -> Result<ModelCallOutput> {
{
use crate::app::instructions::{ReloadOutcome, refresh};
let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
let prior = self.app.instructions.take();
let (refreshed, outcome) = refresh(prior, &cwd);
self.app.instructions = refreshed;
match outcome {
ReloadOutcome::Reloaded {
old_tokens,
new_tokens,
} => {
self.app.set_status(format!(
"MERMAID.md updated — reloaded ({} → {} tokens)",
old_tokens, new_tokens
));
},
ReloadOutcome::LoadedFirst { tokens } => {
self.app
.set_status(format!("MERMAID.md created — loaded ({} tokens)", tokens));
},
ReloadOutcome::Removed => {
self.app
.set_status("MERMAID.md removed — context refreshed");
},
ReloadOutcome::Unchanged => {},
}
}
let trimmed = self.app.build_managed_message_history(
crate::constants::MAX_CONTEXT_TOKENS,
crate::constants::CONTEXT_RESERVE_TOKENS,
);
self.app.operation_state.accumulated_tool_calls.clear();
self.app.clear_response();
let tx_stream = self.tx.clone();
let tx_done = self.tx.clone();
let model_clone = Arc::clone(&model);
let mut effective_config = config.clone();
let reasoning_off = config.reasoning == crate::models::ReasoningLevel::None;
effective_config.hide_reasoning_trace = config.hide_reasoning_trace || reasoning_off;
effective_config.dynamic_system_suffix =
self.app.instructions.as_ref().map(|i| i.content.clone());
let handle = tokio::spawn(async move {
let in_thinking = Arc::new(std::sync::atomic::AtomicBool::new(false));
let sent_text = Arc::new(std::sync::atomic::AtomicBool::new(false));
let in_thinking_cb = Arc::clone(&in_thinking);
let sent_text_cb = Arc::clone(&sent_text);
let tx_stream_cb = tx_stream.clone();
let typed_callback: StreamCallback = Arc::new(move |event| match event {
StreamEvent::Text(text) => {
if in_thinking_cb.swap(false, std::sync::atomic::Ordering::Relaxed) {
let _ = tx_stream_cb
.try_send(TuiStreamEvent::Chunk("\n...done thinking.\n\n".to_string()));
}
sent_text_cb.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = tx_stream_cb.try_send(TuiStreamEvent::Chunk(text));
},
StreamEvent::Reasoning(chunk) => {
if !in_thinking_cb.swap(true, std::sync::atomic::Ordering::Relaxed) {
let _ = tx_stream_cb
.try_send(TuiStreamEvent::Chunk("Thinking...\n\n".to_string()));
}
if !chunk.text.is_empty() {
sent_text_cb.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = tx_stream_cb.try_send(TuiStreamEvent::Chunk(chunk.text));
}
},
StreamEvent::ToolCall(tc) => {
let _ = tx_stream_cb.try_send(TuiStreamEvent::ToolCalls(vec![tc]));
},
StreamEvent::Done { .. } => {},
});
let model = model_clone.read().await;
match model
.chat(&trimmed, &effective_config, Some(typed_callback))
.await
{
Ok(response) => {
if !sent_text.load(std::sync::atomic::Ordering::Relaxed)
&& !response.content.is_empty()
{
let _ = tx_done
.send(TuiStreamEvent::Chunk(response.content.clone()))
.await;
}
if let Some(ref tool_calls) = response.tool_calls
&& !tool_calls.is_empty()
{
let _ = tx_done
.send(TuiStreamEvent::ToolCalls(tool_calls.clone()))
.await;
}
let tokens = response.usage.map(|u| u.total_tokens).unwrap_or(0);
let _ = tx_done
.send(TuiStreamEvent::Done {
total_tokens: tokens,
})
.await;
},
Err(e) => {
let _ = tx_done
.send(TuiStreamEvent::Error(e.to_user_facing()))
.await;
},
}
});
if self.app.app_state.is_generating() {
self.app.update_abort_handle(handle.abort_handle());
self.app.transition_to_sending();
} else {
self.app.start_generation(handle.abort_handle());
}
loop {
let _ = self.render();
if self.drain_events()? {
let (abort, partial) = self.app.abort_generation();
if let Some(h) = abort {
h.abort();
}
if !partial.is_empty() {
self.app.add_message(MessageRole::Assistant, partial);
}
self.app.set_status("Generation stopped");
anyhow::bail!("Generation interrupted by user");
}
let event = match tokio::time::timeout(
Duration::from_millis(UI_POLL_INTERVAL_MS),
self.rx.recv(),
)
.await
{
Ok(Some(ev)) => ev,
Ok(None) => anyhow::bail!("Stream channel closed unexpectedly"),
Err(_) => continue, };
match event {
TuiStreamEvent::Chunk(text) => {
self.app.push_response(&text);
if self.app.app_state.generation_status() != Some(GenerationStatus::Streaming) {
self.app.transition_to_streaming();
}
},
TuiStreamEvent::ToolCalls(calls) => {
self.app
.operation_state
.accumulated_tool_calls
.extend(calls);
},
TuiStreamEvent::Done { total_tokens } => {
let content = self.app.take_response();
let tool_calls =
std::mem::take(&mut self.app.operation_state.accumulated_tool_calls);
return Ok(ModelCallOutput {
content,
tool_calls,
tokens: total_tokens,
});
},
TuiStreamEvent::Error(err) => {
self.handle_stream_error(&err);
anyhow::bail!("{}", err.summary);
},
}
}
}
async fn run_subagents(
&mut self,
specs: Vec<(String, String)>,
model: Arc<RwLock<Box<dyn Model>>>,
config: &ModelConfig,
) -> Vec<SubagentResult> {
let progress = Arc::new(Mutex::new(Vec::<SubagentProgress>::new()));
self.app.operation_state.active_subagents = Some(Arc::clone(&progress));
let (handles, overflow) = spawn_subagents(specs, model, config, Arc::clone(&progress));
let handle_count = handles.len();
self.app
.set_status(format!("Running {} agent(s)...", handle_count));
loop {
if let Some(ref p) = self.app.operation_state.active_subagents {
let total_tokens: usize = p
.lock()
.map(|v| v.iter().map(|a| a.tokens).sum())
.unwrap_or(0);
if let AppState::Generating {
tokens_received, ..
} = &mut self.app.app_state
{
*tokens_received = total_tokens;
}
}
let _ = self.render();
let mut interrupt = false;
if let Ok(true) = self.drain_events() {
interrupt = true;
}
if interrupt {
for h in &handles {
h.abort();
}
self.app.operation_state.clear_subagents();
return Vec::new();
}
if handles.iter().all(|h| h.is_finished()) {
break;
}
tokio::time::sleep(Duration::from_millis(UI_POLL_INTERVAL_MS)).await;
}
let results = collect_subagent_results(handles, overflow).await;
for result in &results {
let display = action_handler::build_agent_action_display(result);
if let Some(last_msg) = self
.app
.session_state
.messages
.iter_mut()
.rev()
.find(|m| matches!(m.role, MessageRole::Assistant))
{
last_msg.actions.push(display);
}
self.app.session_state.cumulative_tokens += result.tokens;
}
self.app
.set_status(format!("{} agent(s) completed", results.len()));
self.app.operation_state.clear_subagents();
results
}
}
pub async fn run_app_loop(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
app: &mut App,
tx: mpsc::Sender<TuiStreamEvent>,
rx: &mut mpsc::Receiver<TuiStreamEvent>,
) -> Result<()> {
let mut title_task: Option<tokio::task::JoinHandle<Option<String>>> = None;
loop {
if let Some(ref task) = title_task
&& task.is_finished()
&& let Some(task) = title_task.take()
&& let Ok(Some(title)) = task.await
{
app.session_state.conversation_title = Some(title);
}
app.poll_mcp_init().await;
let viewport_height = terminal.size()?.height.saturating_sub(8);
terminal.draw(|f| render_ui(f, app))?;
if event::poll(Duration::from_millis(UI_POLL_INTERVAL_MS))? {
let event = event::read()?;
match handle_event(app, event, viewport_height)? {
EventAction::Continue => {},
EventAction::Quit => break,
EventAction::SubmitMessage(input) => {
run_turn(app, input, terminal, &tx, rx).await?;
while let Some(queued) = app.operation_state.take_queued_message() {
run_turn(app, queued, terminal, &tx, rx).await?;
}
app.stop_generation();
if title_task.is_none() {
title_task = app.spawn_title_generation();
}
},
EventAction::ExecuteCommand(command) => {
command_handler::handle_command(app, terminal, &command).await?;
},
}
}
if !app.running {
break;
}
}
Ok(())
}
async fn run_turn(
app: &mut App,
input: String,
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
tx: &mpsc::Sender<TuiStreamEvent>,
rx: &mut mpsc::Receiver<TuiStreamEvent>,
) -> Result<()> {
let images = app.attachment_state.take_base64_data();
app.add_message_with_images(MessageRole::User, input.clone(), images);
app.input.add_to_history(input.clone());
app.input.history_index = None;
app.input.history_buffer.clear();
if let Some(ref mut conv) = app.session_state.current_conversation {
conv.add_to_input_history(input);
if let Some(ref manager) = app.session_state.conversation_manager {
let conv_clone = conv.clone();
let manager_clone = manager.clone();
tokio::task::spawn_blocking(move || {
let _ = manager_clone.save_conversation(&conv_clone);
});
}
}
let mut observer = TuiObserver {
app,
terminal,
tx: tx.clone(),
rx,
};
let model = Arc::clone(&observer.app.model_state.model);
let config = observer.app.build_model_config();
observer.on_generation_start();
let first_call = observer.call_model(Arc::clone(&model), &[], &config).await;
let (content, initial_tool_calls) = match first_call {
Ok(out) => {
observer.on_generation_complete(out.tokens);
(out.content, out.tool_calls)
},
Err(_) => {
observer.app.stop_generation();
return Ok(());
},
};
if !content.is_empty() || !initial_tool_calls.is_empty() {
observer
.app
.add_assistant_message_with_tool_calls(content, initial_tool_calls.clone());
}
if !initial_tool_calls.is_empty() {
let mut messages = observer.app.build_managed_message_history(
crate::constants::MAX_CONTEXT_TOKENS,
crate::constants::CONTEXT_RESERVE_TOKENS,
);
let _ = run_agent_loop(
model,
&config,
&mut messages,
initial_tool_calls,
&mut observer,
MAX_AGENT_ITERATIONS,
)
.await?;
}
Ok(())
}