pub mod conversation;
mod request_builder;
mod response_handler;
pub mod tui;
pub use conversation::ConversationHistory;
pub use request_builder::RequestBuilder;
pub use response_handler::ResponseHandler;
use std::io::IsTerminal;
use crate::api::LlmClient::Anthropic;
use crate::api::{CreateMessageRequest, ImageSource, LlmClient, MessageContentBlock, MorphClient};
use crate::config::{ModelConfig, NORMAL_MODE_MESSAGE, SAFE_MODE_MESSAGE};
use crate::error::{Result, SofosError};
use crate::mcp::McpManager;
use crate::session::{DisplayMessage, HistoryManager, SessionMetadata, SessionState};
use crate::tools::ToolExecutor;
use crate::tools::image::{ImageLoader, ImageReference, extract_image_references};
use crate::ui::{UI, set_safe_mode_cursor_style};
use colored::Colorize;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::sleep;
pub type SteerQueue = Arc<Mutex<Vec<String>>>;
pub struct ReplConfig {
pub model: String,
pub max_tokens: u32,
pub enable_thinking: bool,
pub thinking_budget: u32,
pub safe_mode: bool,
}
impl ReplConfig {
pub fn new(
model: String,
max_tokens: u32,
enable_thinking: bool,
thinking_budget: u32,
safe_mode: bool,
) -> Self {
Self {
model,
max_tokens,
enable_thinking,
thinking_budget,
safe_mode,
}
}
}
pub struct Repl {
client: LlmClient,
tool_executor: ToolExecutor,
history_manager: HistoryManager,
image_loader: ImageLoader,
ui: UI,
model_config: ModelConfig,
session_state: SessionState,
safe_mode: bool,
available_tools: Vec<crate::api::Tool>,
interrupt_flag: Arc<AtomicBool>,
steer_queue: SteerQueue,
startup_banner: String,
runtime: tokio::runtime::Runtime,
}
impl Repl {
pub fn new(
client: LlmClient,
config: ReplConfig,
workspace: PathBuf,
morph_client: Option<MorphClient>,
) -> Result<Self> {
let runtime = tokio::runtime::Runtime::new()
.map_err(|e| SofosError::Config(format!("Failed to create async runtime: {}", e)))?;
let mcp_manager = runtime.block_on(async {
match McpManager::new(workspace.clone()).await {
Ok(manager) => Some(manager),
Err(e) => {
eprintln!("Warning: Failed to initialize MCP manager: {}", e);
None
}
}
});
let tool_executor = ToolExecutor::new(
workspace.clone(),
morph_client,
mcp_manager,
config.safe_mode,
std::io::stdin().is_terminal(),
)?;
let has_morph = tool_executor.has_morph();
let has_code_search = tool_executor.has_code_search();
let history_manager = HistoryManager::new(workspace.clone())?;
let image_loader = ImageLoader::new(workspace.clone())?;
let custom_instructions = history_manager.load_custom_instructions()?;
if custom_instructions.is_some() {
eprintln!("{}", "Loaded custom instructions".bright_green());
}
if config.enable_thinking && config.thinking_budget >= config.max_tokens {
return Err(SofosError::Config(format!(
"thinking_budget ({}) must be less than max_tokens ({})",
config.thinking_budget, config.max_tokens
)));
}
let mut conversation =
ConversationHistory::with_features(has_morph, has_code_search, custom_instructions);
if config.safe_mode {
conversation.add_user_message(SAFE_MODE_MESSAGE.to_string());
set_safe_mode_cursor_style()?;
}
let session_id = HistoryManager::generate_session_id();
let session_state = SessionState::new(session_id, conversation);
let model_config = ModelConfig::new(
config.model,
config.max_tokens,
config.enable_thinking,
config.thinking_budget,
);
let ui = UI::new();
let available_tools = runtime.block_on(async { tool_executor.get_available_tools().await });
Ok(Self {
client,
tool_executor,
history_manager,
image_loader,
ui,
model_config,
session_state,
safe_mode: config.safe_mode,
available_tools,
interrupt_flag: Arc::new(AtomicBool::new(false)),
steer_queue: Arc::new(Mutex::new(Vec::new())),
startup_banner: String::new(),
runtime,
})
}
pub fn run(self) -> Result<()> {
tui::run(self)
}
pub fn set_startup_banner(&mut self, text: String) {
self.startup_banner = text;
}
pub(crate) fn take_startup_banner(&mut self) -> String {
std::mem::take(&mut self.startup_banner)
}
pub fn install_interrupt_flag(&mut self, flag: Arc<AtomicBool>) {
self.interrupt_flag = flag;
}
pub fn install_steer_queue(&mut self, queue: SteerQueue) {
self.steer_queue = queue;
}
pub fn model_label(&self) -> String {
self.model_config.model.clone()
}
pub fn list_saved_sessions(&self) -> Result<Vec<SessionMetadata>> {
self.history_manager.list_sessions()
}
pub fn status_snapshot(&self) -> tui::event::StatusSnapshot {
let reasoning = if matches!(self.client, Anthropic(_)) {
if self.model_config.enable_thinking {
format!("thinking: {} tok", self.model_config.thinking_budget)
} else {
"thinking: off".to_string()
}
} else if self.model_config.enable_thinking {
"effort: high".to_string()
} else {
"effort: low".to_string()
};
tui::event::StatusSnapshot {
model: self.model_config.model.clone(),
mode: if self.safe_mode {
tui::event::Mode::Safe
} else {
tui::event::Mode::Normal
},
reasoning,
input_tokens: self.session_state.total_input_tokens,
output_tokens: self.session_state.total_output_tokens,
}
}
pub fn process_message(
&mut self,
user_input: &str,
pasted_images: Vec<crate::clipboard::PastedImage>,
) -> Result<()> {
let (remaining_text, image_refs) = extract_image_references(user_input);
let has_images = !image_refs.is_empty() || !pasted_images.is_empty();
if !image_refs.is_empty() {
println!(
"{} Detected {} image reference(s)",
"🔍".bright_cyan(),
image_refs.len()
);
}
let content_blocks = if has_images {
let mut blocks: Vec<MessageContentBlock> = Vec::new();
for pasted in &pasted_images {
blocks.push(MessageContentBlock::Image {
source: ImageSource::Base64 {
media_type: pasted.media_type.clone(),
data: pasted.base64_data.clone(),
},
cache_control: None,
});
}
let mut failed_images: Vec<String> = Vec::new();
for img_ref in &image_refs {
match self.image_loader.load_image(img_ref) {
Ok(source) => {
let api_source = match source {
crate::tools::image::ImageSource::Base64 { media_type, data } => {
ImageSource::Base64 { media_type, data }
}
crate::tools::image::ImageSource::Url { url } => {
ImageSource::Url { url }
}
};
blocks.push(MessageContentBlock::Image {
source: api_source,
cache_control: None,
});
let path_str = match img_ref {
ImageReference::LocalPath(p) => format!("local: {}", p),
ImageReference::WebUrl(u) => format!("url: {}", u),
};
println!("{} {}", "📷 Image loaded:".bright_cyan(), path_str.dimmed());
}
Err(e) => {
let path_str = match img_ref {
ImageReference::LocalPath(p) => p.clone(),
ImageReference::WebUrl(u) => u.clone(),
};
let error_msg = format!("[Failed to load image '{}': {}]", path_str, e);
failed_images.push(error_msg);
println!(
"\n{} {}\n",
"⚠️ Failed to load image:".bright_yellow().bold(),
e
);
}
}
}
let mut text_parts: Vec<String> = Vec::new();
if !remaining_text.trim().is_empty() {
text_parts.push(remaining_text.clone());
}
if !failed_images.is_empty() {
text_parts.extend(failed_images);
}
if !text_parts.is_empty() {
blocks.push(MessageContentBlock::Text {
text: text_parts.join("\n\n"),
cache_control: None,
});
} else if blocks.is_empty() {
return Err(SofosError::ToolExecution(
"No valid images or text in message".to_string(),
));
}
Some(blocks)
} else {
None
};
if let Some(blocks) = content_blocks {
self.session_state.conversation.add_user_with_blocks(blocks);
} else {
self.session_state
.conversation
.add_user_message(user_input.to_string());
}
self.session_state
.display_messages
.push(DisplayMessage::UserMessage {
content: user_input.to_string(),
});
if self.session_state.conversation.needs_compaction() {
let _ = self.compact_conversation(false);
}
let initial_request = self.build_initial_request();
let runtime = &self.runtime;
let use_streaming = false;
let client_for_retry = self.client.clone();
let response_result: Result<_> = if use_streaming {
let printer = Arc::new(crate::ui::StreamPrinter::new());
let p_text = printer.clone();
let p_think = printer.clone();
let interrupt = Arc::clone(&self.interrupt_flag);
let client = self.client.clone();
let req = initial_request;
let result = runtime.block_on(async move {
client
.create_message_streaming(
req,
move |t| p_text.on_text_delta(t),
move |t| p_think.on_thinking_delta(t),
interrupt,
)
.await
});
printer.finish();
result
} else {
let interrupt_flag = Arc::clone(&self.interrupt_flag);
let client = self.client.clone();
let req = initial_request;
let mut request_handle = runtime.spawn(async move { client.create_message(req).await });
let result = runtime.block_on(async {
tokio::select! {
res = &mut request_handle => {
match res {
Ok(inner) => inner,
Err(e) => Err(SofosError::Join(format!("{}", e)))
}
}
_ = Self::wait_for_interrupt(Arc::clone(&interrupt_flag)) => {
request_handle.abort();
Err(SofosError::Interrupted)
}
}
});
if self.interrupt_flag.load(Ordering::Relaxed) {
self.handle_initial_interrupt();
return Ok(());
}
result
};
let response = match response_result {
Ok(resp) => resp,
Err(e) => {
if let SofosError::Api(ref msg) = e {
let is_400_error = msg.contains("400");
let is_image_error = msg.contains("Unable to download")
|| msg.contains("invalid_request_error")
|| msg.contains("verify the URL");
let current_has_images = !image_refs.is_empty();
let conversation_has_images = self
.session_state
.conversation
.messages()
.iter()
.any(|msg| {
use crate::api::{MessageContent, MessageContentBlock};
if let MessageContent::Blocks { content } = &msg.content {
content
.iter()
.any(|block| matches!(block, MessageContentBlock::Image { .. }))
} else {
false
}
});
let has_images = current_has_images || conversation_has_images;
if is_400_error && is_image_error && has_images {
println!(
"\n{} One or more image URLs in the conversation could not be loaded by the API\n",
"⚠️ Image loading error:".bright_yellow().bold()
);
let conversation_backup =
self.session_state.conversation.messages().to_vec();
self.session_state.conversation.remove_last_message();
let messages = self.session_state.conversation.messages();
let mut cleaned_messages = Vec::new();
for msg in messages {
use crate::api::{Message, MessageContent, MessageContentBlock};
let cleaned_msg = match &msg.content {
MessageContent::Blocks { content } => {
let filtered_blocks: Vec<MessageContentBlock> = content
.iter()
.filter(|block| {
!matches!(block, MessageContentBlock::Image { .. })
})
.cloned()
.collect();
if filtered_blocks.is_empty() {
continue;
} else {
Message {
role: msg.role.clone(),
content: MessageContent::Blocks {
content: filtered_blocks,
},
}
}
}
_ => msg.clone(),
};
cleaned_messages.push(cleaned_msg);
}
self.session_state.conversation.clear();
self.session_state
.conversation
.restore_messages(cleaned_messages);
let error_message = if !image_refs.is_empty() {
"[SYSTEM ERROR: Image URLs in your message could not be loaded and have been removed from the conversation.]"
} else {
"[SYSTEM ERROR: Image URLs from a previous message could not be loaded and have been removed from the conversation. You can continue normally.]"
}.to_string();
self.session_state
.conversation
.add_user_message(error_message);
let new_request = self.build_initial_request();
println!("{}", "Retrying request without images...".dimmed());
println!();
match runtime
.block_on(async { client_for_retry.create_message(new_request).await })
{
Ok(resp) => resp,
Err(retry_err) => {
self.session_state.conversation.clear();
self.session_state
.conversation
.restore_messages(conversation_backup);
self.session_state
.conversation
.add_assistant_with_blocks(vec![
crate::api::MessageContentBlock::Text {
text: format!(
"[Image loading failed and retry also failed: {}. \
Your message is preserved above.]",
retry_err
),
cache_control: None,
},
]);
return Err(retry_err);
}
}
} else {
self.session_state
.conversation
.add_assistant_with_blocks(vec![
crate::api::MessageContentBlock::Text {
text: format!(
"[API error: {}. I was unable to process your request.]",
msg
),
cache_control: None,
},
]);
return Err(e);
}
} else {
self.session_state
.conversation
.add_assistant_with_blocks(vec![crate::api::MessageContentBlock::Text {
text: format!(
"[System error: {}. I was unable to process your request.]",
e
),
cache_control: None,
}]);
return Err(e);
}
}
};
self.session_state
.add_tokens(response.usage.input_tokens, response.usage.output_tokens);
let mut handler = ResponseHandler::new(
self.client.clone(),
self.tool_executor.clone(),
self.session_state.conversation.clone(),
self.model_config.model.clone(),
self.model_config.max_tokens,
self.model_config.enable_thinking,
self.model_config.thinking_budget,
self.available_tools.clone(),
use_streaming,
Arc::clone(&self.interrupt_flag),
Arc::clone(&self.steer_queue),
);
let result = runtime.block_on(handler.handle_response(
response.content,
&mut self.session_state.display_messages,
&mut self.session_state.total_input_tokens,
&mut self.session_state.total_output_tokens,
));
self.session_state.conversation = handler.conversation().clone();
match result {
Ok(_) => Ok(()),
Err(SofosError::Interrupted) => Ok(()),
Err(e) => {
let error_text = format!(
"[System error during processing: {}. Previous actions are preserved above.]",
e
);
let last_role = self
.session_state
.conversation
.messages()
.last()
.map(|m| m.role.as_str());
if last_role == Some("assistant") {
self.session_state.conversation.add_user_message(error_text);
} else {
self.session_state
.conversation
.add_assistant_with_blocks(vec![crate::api::MessageContentBlock::Text {
text: error_text,
cache_control: None,
}]);
}
Err(e)
}
}
}
fn build_initial_request(&self) -> CreateMessageRequest {
RequestBuilder::new(
&self.client,
&self.model_config.model,
self.model_config.max_tokens,
&self.session_state.conversation,
self.get_available_tools(),
self.model_config.enable_thinking,
self.model_config.thinking_budget,
)
.build()
}
pub fn process_single_prompt(&mut self, prompt: &str) -> Result<()> {
let symbol = if self.safe_mode { "λ:" } else { "λ>" };
println!("{} {}", symbol.bright_green().bold(), prompt);
println!();
self.process_message(prompt, vec![])?;
self.save_current_session()?;
UI::display_session_summary(
&self.model_config.model,
self.session_state.total_input_tokens,
self.session_state.total_output_tokens,
);
Ok(())
}
pub fn save_current_session(&self) -> Result<()> {
if self.session_state.conversation.messages().is_empty() {
return Ok(());
}
self.history_manager.save_session(
&self.session_state.session_id,
self.session_state.conversation.messages(),
&self.session_state.display_messages,
self.session_state.conversation.system_prompt(),
)?;
Ok(())
}
pub fn get_session_summary(&self) -> (String, u32, u32) {
(
self.model_config.model.clone(),
self.session_state.total_input_tokens,
self.session_state.total_output_tokens,
)
}
pub fn handle_clear_command(&mut self) -> Result<()> {
let new_session_id = HistoryManager::generate_session_id();
self.session_state.conversation.clear();
self.session_state.clear(new_session_id);
self.session_state
.conversation
.add_user_message("The session history has been cleared".to_string());
println!("\n{}\n", "Conversation history cleared.".bright_yellow());
Ok(())
}
pub fn handle_resume_command(&mut self) -> Result<()> {
let sessions = self.history_manager.list_sessions()?;
if sessions.is_empty() {
println!("{}", "No saved sessions found.".yellow());
return Ok(());
}
let selected_id = crate::session::select_session(sessions)?;
if let Some(session_id) = selected_id {
self.load_session_by_id(&session_id)?;
println!(
"{} {}",
"Session loaded:".bright_green(),
"Continue your conversation below".dimmed()
);
println!();
}
Ok(())
}
pub fn handle_think_on(&mut self) {
self.model_config.set_thinking(true);
if matches!(self.client, Anthropic(_)) {
println!(
"\n{} (budget: {} tokens)\n",
"Extended thinking enabled.".bright_green(),
self.model_config.thinking_budget
);
} else {
let reasoning = Some(crate::api::Reasoning::enabled());
let effort: Option<&str> = reasoning.as_ref().map(|r| r.effort.as_str());
if let Some(e) = effort {
println!("\n{} {}\n", "Reasoning effort:".bright_green(), e);
}
}
}
pub fn handle_think_off(&mut self) {
self.model_config.set_thinking(false);
if matches!(self.client, Anthropic(_)) {
println!("\n{}\n", "Extended thinking disabled.".bright_yellow());
} else {
let reasoning = Some(crate::api::Reasoning::disabled());
let effort: Option<&str> = reasoning.as_ref().map(|r| r.effort.as_str());
if let Some(e) = effort {
println!("\n{} {}\n", "Reasoning effort:".bright_green(), e);
}
}
}
pub fn handle_think_status(&self) {
if self.model_config.enable_thinking {
println!(
"\n{} (budget: {} tokens)\n",
"Extended thinking is enabled".bright_green(),
self.model_config.thinking_budget
);
} else {
println!("\n{}\n", "Extended thinking is disabled".bright_yellow());
}
}
pub fn enable_safe_mode(&mut self) {
if !self.safe_mode {
self.safe_mode = true;
self.tool_executor.set_safe_mode(true);
self.refresh_available_tools();
self.session_state
.conversation
.add_user_message(SAFE_MODE_MESSAGE.to_string());
}
}
pub fn disable_safe_mode(&mut self) {
if self.safe_mode {
self.safe_mode = false;
self.tool_executor.set_safe_mode(false);
self.refresh_available_tools();
self.session_state
.conversation
.add_user_message(NORMAL_MODE_MESSAGE.to_string());
}
}
fn refresh_available_tools(&mut self) {
let tools = self
.runtime
.block_on(async { self.tool_executor.get_available_tools().await });
self.available_tools = tools;
}
pub fn load_session_by_id(&mut self, session_id: &str) -> Result<()> {
let session = self.history_manager.load_session(session_id)?;
self.session_state.session_id = session.id.clone();
self.session_state.conversation.clear();
self.session_state
.conversation
.restore_messages(session.api_messages.clone());
self.session_state.display_messages = session.display_messages.clone();
println!(
"{} {} ({} messages)",
"Loaded session:".bright_green(),
session.id,
session.api_messages.len()
);
println!();
self.ui.display_session(&session)?;
Ok(())
}
fn handle_initial_interrupt(&mut self) {
println!(
"\n{}",
"Interrupted by user. You can now provide additional guidance.".bright_yellow()
);
println!();
let interrupt_msg = "INTERRUPT: The user pressed ESC to interrupt the request before receiving a response. \
They want to provide additional guidance or clarification. Wait for their next message.";
self.session_state
.conversation
.add_user_message(interrupt_msg.to_string());
self.session_state
.display_messages
.push(DisplayMessage::UserMessage {
content: "[Interrupted - no response received]".to_string(),
});
}
fn get_available_tools(&self) -> Vec<crate::api::Tool> {
self.available_tools.clone()
}
pub fn compact_conversation(&mut self, force: bool) -> Result<bool> {
if !force && !self.session_state.conversation.needs_compaction() {
return Ok(false);
}
let tokens_before = self.session_state.conversation.estimate_total_tokens();
let split_point = self.session_state.conversation.compaction_split_point();
if split_point == 0 {
if force {
println!("\n{}\n", "Not enough messages to compact.".bright_yellow());
}
return Ok(false);
}
self.session_state
.conversation
.truncate_tool_results(split_point);
if !force && !self.session_state.conversation.needs_compaction() {
let tokens_after = self.session_state.conversation.estimate_total_tokens();
println!(
"\n{} {} -> {} tokens (tool results truncated)\n",
"Compacted:".bright_green(),
tokens_before,
tokens_after
);
return Ok(true);
}
let older_messages: Vec<_> =
self.session_state.conversation.messages()[..split_point].to_vec();
let serialized = ConversationHistory::serialize_messages_for_summary(&older_messages);
let summary_system = vec![crate::api::SystemPrompt::new_cached_with_ttl(
"You are a conversation summarizer. Produce a detailed but concise summary of the following \
coding assistant conversation. Preserve:\n\
1. All file paths mentioned or modified\n\
2. Key decisions made and their rationale\n\
3. Current state of any ongoing task\n\
4. Any errors encountered and how they were resolved\n\n\
Format as structured sections. Do NOT include raw file contents or verbose tool output — \
just what was done and decided."
.to_string(),
None,
)];
let summary_request = CreateMessageRequest {
model: self.model_config.model.clone(),
max_tokens: 4096,
messages: vec![crate::api::Message::user(serialized)],
system: Some(summary_system),
tools: None,
stream: None,
thinking: None,
reasoning: None,
};
let interrupt_flag = Arc::clone(&self.interrupt_flag);
let client = self.client.clone();
let mut request_handle = self
.runtime
.spawn(async move { client.create_message(summary_request).await });
let response_result = self.runtime.block_on(async {
tokio::select! {
res = &mut request_handle => {
match res {
Ok(inner) => inner,
Err(e) => Err(SofosError::Join(format!("{}", e)))
}
}
_ = Self::wait_for_interrupt(Arc::clone(&interrupt_flag)) => {
request_handle.abort();
Err(SofosError::Interrupted)
}
}
});
match response_result {
Ok(response) => {
let summary_text: String = response
.content
.iter()
.filter_map(|block| {
if let crate::api::ContentBlock::Text { text } = block {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
if summary_text.len() < 50 {
UI::print_warning(
"Compaction produced an insufficient summary. Falling back to trimming.",
);
self.session_state.conversation.fallback_trim();
return Ok(false);
}
self.session_state
.conversation
.replace_with_summary(summary_text, split_point);
self.session_state
.add_tokens(response.usage.input_tokens, response.usage.output_tokens);
let tokens_after = self.session_state.conversation.estimate_total_tokens();
println!(
"{} {} -> {} tokens (saved {}%)",
"Compacted:".bright_green(),
tokens_before,
tokens_after,
if tokens_before > 0 {
100 - (tokens_after * 100 / tokens_before)
} else {
0
}
);
Ok(true)
}
Err(SofosError::Interrupted) => {
UI::print_warning("Compaction interrupted. Falling back to trimming.");
self.session_state.conversation.fallback_trim();
Ok(false)
}
Err(e) => {
UI::print_warning(&format!(
"Compaction failed: {}. Falling back to trimming.",
e
));
self.session_state.conversation.fallback_trim();
Ok(false)
}
}
}
pub fn handle_compact_command(&mut self) -> Result<()> {
self.compact_conversation(true)?;
Ok(())
}
async fn wait_for_interrupt(flag: Arc<AtomicBool>) {
while !flag.load(Ordering::Relaxed) {
sleep(Duration::from_millis(50)).await;
}
}
}