use std::sync::Arc;
use kodegen_candle_agent::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tracing::{debug, warn};
use tokio::task::JoinSet;
use tokio::sync::Semaphore;
use crate::utils::errors::UtilsError;
use kodegen_mcp_schema::browser::BrowserNavigateArgs;
use crate::tools::BrowserNavigateTool;
use crate::page_extractor::{PageMetadata, extract_page_info};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResearchResult {
pub url: String,
pub title: String,
pub content: String,
pub summary: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub metadata: PageMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResearchOptions {
pub max_pages: usize,
pub max_depth: usize,
pub search_engine: String,
pub include_links: bool,
pub extract_tables: bool,
pub extract_images: bool,
pub timeout_seconds: u64,
}
impl Default for ResearchOptions {
fn default() -> Self {
Self {
max_pages: 5,
max_depth: 2,
search_engine: "google".to_string(),
include_links: true,
extract_tables: true,
extract_images: false,
timeout_seconds: 60,
}
}
}
#[derive(Clone)]
pub struct DeepResearch {
browser_manager: Arc<crate::BrowserManager>,
temperature: f64,
max_tokens: u64,
visited_urls: Arc<Mutex<Vec<String>>>,
}
impl DeepResearch {
pub fn new(
browser_manager: Arc<crate::BrowserManager>,
temperature: f64,
max_tokens: u64,
) -> Self {
Self {
browser_manager,
temperature,
max_tokens,
visited_urls: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn research(
&self,
query: &str,
options: Option<ResearchOptions>,
results: Arc<tokio::sync::RwLock<Vec<ResearchResult>>>,
total_results: Arc<std::sync::atomic::AtomicUsize>,
) -> Result<(), UtilsError> {
let options = options.unwrap_or_default();
let mut visited = self.visited_urls.lock().await;
visited.clear();
drop(visited);
let search_results = self.search_query(query, &options).await?;
let semaphore = Arc::new(Semaphore::new(3)); let mut join_set = JoinSet::new();
for url in search_results.iter().take(options.max_pages) {
let url = url.clone();
let options = options.clone();
let results = Arc::clone(&results);
let total_results = Arc::clone(&total_results);
let semaphore = Arc::clone(&semaphore);
let research = self.clone();
join_set.spawn(async move {
let _permit = semaphore
.acquire()
.await
.map_err(|e| UtilsError::UnexpectedError(format!("Semaphore error: {}", e)))?;
match research.process_url(&url, &options).await {
Ok(result) => {
{
let mut results_guard = results.write().await;
results_guard.push(result);
}
total_results.fetch_add(1, std::sync::atomic::Ordering::Release);
Ok(())
}
Err(e) => {
warn!("Error processing URL {}: {}", url, e);
Err(e)
}
}
});
}
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(())) => {
}
Ok(Err(_e)) => {
}
Err(e) => {
warn!("Research task panicked: {}", e);
}
}
}
Ok(())
}
async fn search_query(
&self,
query: &str,
_options: &ResearchOptions,
) -> Result<Vec<String>, UtilsError> {
debug!("web_search needs to call citescrape tool for query: {}", query);
Err(UtilsError::BrowserError(format!(
"web_search functionality moved to kodegen-tools-citescrape package. \
Deep research needs to be updated to call citescrape web_search tool. \
Query was: {}",
query
)))
}
async fn process_url(
&self,
url: &str,
options: &ResearchOptions,
) -> Result<ResearchResult, UtilsError> {
{
let mut visited = self.visited_urls.lock().await;
if visited.contains(&url.to_string()) {
return Err(UtilsError::UnexpectedError("URL already visited".into()));
}
visited.push(url.to_string());
}
debug!("Navigating to {} and capturing page handle", url);
let nav_tool = BrowserNavigateTool::new(self.browser_manager.clone());
let nav_args = BrowserNavigateArgs {
url: url.to_string(),
wait_for_selector: None,
timeout_ms: Some(options.timeout_seconds * 1000),
};
let (page, nav_result) = nav_tool
.navigate_and_capture_page(nav_args)
.await
.map_err(|e| UtilsError::BrowserError(e.to_string()))?;
let final_url = nav_result.url;
debug!("Extracting page info from captured page");
let page_info = extract_page_info(page.clone())
.await
.map_err(|e| UtilsError::BrowserError(e.to_string()))?;
let title = page_info.title;
debug!("Extracting content from captured page");
let eval_result = page
.evaluate("document.body.innerText")
.await
.map_err(|e| UtilsError::BrowserError(format!("Failed to extract text: {}", e)))?;
let text_value = eval_result
.into_value()
.map_err(|e| UtilsError::BrowserError(format!("Failed to parse text result: {}", e)))?;
let content = if let serde_json::Value::String(text) = text_value {
text
} else {
let html = page
.content()
.await
.map_err(|e| UtilsError::BrowserError(format!("Failed to get HTML content: {}", e)))?;
html2md::parse_html(&html)
};
let summary = self.summarize_content(&title, &content).await?;
let result = ResearchResult {
url: final_url.clone(),
title,
content,
summary,
timestamp: chrono::Utc::now(),
metadata: page_info.metadata,
};
if let Err(e) = page.close().await {
warn!("Failed to close page for {}: {}", final_url, e);
}
Ok(result)
}
async fn summarize_content(&self, title: &str, content: &str) -> Result<String, UtilsError> {
let max_content_chars = 8000;
let truncated_content = if content.chars().count() > max_content_chars {
let truncated: String = content.chars().take(max_content_chars).collect();
format!("{}... [content truncated]", truncated)
} else {
content.to_string()
};
let prompt = format!(
"Please summarize the following webpage content.\n\nTitle: '{}'\n\nContent:\n{}",
title, truncated_content
);
let mut stream = CandleFluentAi::agent_role("research-summarizer")
.temperature(self.temperature)
.max_tokens(self.max_tokens)
.system_prompt(
"You are an AI research assistant that summarizes web content accurately \
and concisely. Extract key information, findings, data points, and conclusions. \
Organize information logically and provide accurate section headers where appropriate. \
Focus on factual content, avoid speculation."
)
.on_chunk(|chunk| async move {
chunk
})
.into_agent()
.map_err(|e| UtilsError::AgentError(e.to_string()))?
.chat(move |_conversation| {
let prompt_clone = prompt.clone();
async move { CandleChatLoop::UserPrompt(prompt_clone) }
})
.map_err(|e| UtilsError::LlmError(e.to_string()))?;
use tokio_stream::StreamExt;
let mut summary = String::with_capacity(8192);
while let Some(chunk) = stream.next().await {
match chunk {
CandleMessageChunk::Text(text) => {
summary.push_str(&text);
}
CandleMessageChunk::Complete { .. } => {
break;
}
_ => {
}
}
}
if summary.is_empty() {
return Err(UtilsError::LlmError("Empty summary generated".into()));
}
Ok(summary)
}
}