use crate::content;
use crate::types::{Config, SearchOutput};
use reqwest::Client;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, Semaphore};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "chrome")]
use crate::browser::{detect_chrome, extract_text_with_chrome, ChromeBrowser};
pub type PerHostSemaphoreMap = Arc<Mutex<HashMap<String, Arc<Semaphore>>>>;
pub async fn get_semaphore_for_host(
mapa: &PerHostSemaphoreMap,
host: &str,
limit: usize,
) -> Arc<Semaphore> {
let mut guard = mapa.lock().await;
guard
.entry(host.to_string())
.or_insert_with(|| Arc::new(Semaphore::new(limit.max(1))))
.clone()
}
#[inline]
pub fn extract_host(url: &str) -> String {
reqwest::Url::parse(url)
.ok()
.and_then(|u| u.host_str().map(|s| s.to_lowercase()))
.unwrap_or_else(|| "unknown".to_string())
}
#[tracing::instrument(skip_all, fields(result_count = output.result_count, parallelism = config.parallelism))]
pub async fn enrich_with_content(
output: &mut SearchOutput,
client: &Client,
config: &Config,
cancellation: &CancellationToken,
) {
if !config.fetch_content || output.results.is_empty() {
return;
}
let total = output.results.len();
tracing::info!(
total,
parallel = config.parallelism,
"starting parallel enrichment with --fetch-content"
);
let semaphore = Arc::new(Semaphore::new(config.parallelism.max(1) as usize));
let mapa_por_host: PerHostSemaphoreMap =
Arc::new(Mutex::new(HashMap::with_capacity(total.min(32))));
let per_host_limit = config.per_host_limit.max(1);
let max_size = config.max_content_length;
#[cfg(feature = "chrome")]
let navegador_chrome: Option<Arc<Mutex<ChromeBrowser>>> = {
let manual_path = config.chrome_path.as_deref();
match detect_chrome(manual_path) {
Ok(path) => {
tracing::info!(path = %path.display(), "Chrome detected — enabling fallback");
let timeout_launch = std::time::Duration::from_secs(30);
match ChromeBrowser::launch(&path, config.proxy.as_deref(), timeout_launch).await {
Ok(n) => Some(Arc::new(Mutex::new(n))),
Err(erro) => {
tracing::warn!(
?erro,
"failed to launch Chrome — continuing with HTTP only"
);
None
}
}
}
Err(erro) => {
tracing::info!(?erro, "Chrome not detected — continuing with HTTP only");
None
}
}
};
#[cfg(not(feature = "chrome"))]
{
if config.chrome_path.is_some() {
tracing::warn!(
"--chrome-path provided but binary was not compiled with --features chrome — ignoring"
);
}
}
type ResultadoFetch = (usize, Option<(String, u32, String)>);
let mut tasks: JoinSet<ResultadoFetch> = JoinSet::new();
for (index, result_item) in output.results.iter().enumerate() {
if cancellation.is_cancelled() {
tracing::warn!("cancellation detected — aborting fetch spawns");
break;
}
let url = result_item.url.clone();
let task_client = client.clone();
let task_semaphore = Arc::clone(&semaphore);
let mapa_task = Arc::clone(&mapa_por_host);
let task_cancellation = cancellation.clone();
#[cfg(feature = "chrome")]
let nav_task: Option<Arc<Mutex<ChromeBrowser>>> = navegador_chrome.as_ref().map(Arc::clone);
tasks.spawn(async move {
tracing::debug!(
permits_available = task_semaphore.available_permits(),
fetch_index = index,
"awaiting global semaphore permit"
);
let Ok(permit_global) = task_semaphore.acquire_owned().await else {
tracing::debug!(index, "global semaphore closed — skipping");
return (index, None);
};
if task_cancellation.is_cancelled() {
drop(permit_global);
return (index, None);
}
let host = extract_host(&url);
let semaforo_host = get_semaphore_for_host(&mapa_task, &host, per_host_limit).await;
tracing::debug!(
permits_available = semaforo_host.available_permits(),
fetch_index = index,
%host,
"awaiting per-host semaphore permit"
);
let Ok(permit_host) = semaforo_host.acquire_owned().await else {
tracing::debug!(index, host, "per-host semaphore closed — skipping");
drop(permit_global);
return (index, None);
};
if task_cancellation.is_cancelled() {
drop(permit_host);
drop(permit_global);
return (index, None);
}
let result_item =
content::extract_http_content(&task_client, &url, max_size, &task_cancellation)
.await;
let retorno = match result_item {
Ok(Some((text, size))) if !text.is_empty() => {
(index, Some((text, size, "http".to_string())))
}
Ok(Some((_vazio, _size_original))) => {
#[cfg(feature = "chrome")]
{
if let Some(nav) = nav_task {
tracing::debug!(
index,
url,
"HTTP content insufficient — trying Chrome"
);
let mut guarda = nav.lock().await;
match extract_text_with_chrome(
&mut guarda,
&url,
max_size,
std::time::Duration::from_secs(30),
)
.await
{
Ok(text) if !text.is_empty() => {
let size_cast = u32::try_from(text.len()).unwrap_or(u32::MAX);
drop(permit_host);
drop(permit_global);
return (index, Some((text, size_cast, "chrome".to_string())));
}
Ok(_) => {
tracing::debug!(index, url, "Chrome also returned empty");
}
Err(error) => {
tracing::debug!(index, url, ?error, "Chrome failed");
}
}
}
}
(index, None)
}
Ok(None) => {
tracing::debug!(index, url, "content-type not HTML — no content");
(index, None)
}
Err(error) => {
tracing::debug!(index, url, ?error, "failed to extract HTTP content");
(index, None)
}
};
drop(permit_host);
drop(permit_global);
retorno
});
}
let mut sucessos: u32 = 0;
let mut falhas: u32 = 0;
let mut usou_chrome: bool = false;
while let Some(join_res) = tasks.join_next().await {
match join_res {
Ok((index, Some((text, size, method)))) => {
if index < output.results.len() && !text.is_empty() {
let res = &mut output.results[index];
if method == "chrome" {
usou_chrome = true;
}
res.content = Some(text);
res.content_size = Some(size);
res.content_extraction_method = Some(method);
sucessos = sucessos.saturating_add(1);
} else {
falhas = falhas.saturating_add(1);
}
}
Ok((_, None)) => {
falhas = falhas.saturating_add(1);
}
Err(error_join) => {
if error_join.is_panic() {
tracing::error!(
?error_join,
"fetch task panicked — permit recovered via RAII"
);
} else {
tracing::warn!(?error_join, "fetch task cancelled");
}
falhas = falhas.saturating_add(1);
}
}
}
output.metadata.concurrent_fetches = u32::try_from(total).unwrap_or(u32::MAX);
output.metadata.fetch_successes = sucessos;
output.metadata.fetch_failures = falhas;
if usou_chrome {
output.metadata.used_chrome = true;
}
#[cfg(feature = "chrome")]
if let Some(nav_arc) = navegador_chrome {
drop(nav_arc); tracing::debug!("Chrome dropped after enrichment");
}
tracing::info!(total, sucessos, falhas, "content enrichment complete");
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{Endpoint, OutputFormat, SafeSearch, SearchMetadata, SearchResult};
fn test_config(parallelism: u32, max_tam: usize) -> Config {
Config {
query: "q".to_string(),
queries: vec!["q".to_string()],
num_results: None,
format: OutputFormat::Json,
timeout_seconds: 5,
language: "pt".to_string(),
country: "br".to_string(),
verbose: false,
quiet: true,
user_agent: "Mozilla/5.0".to_string(),
browser_profile: crate::http::create_browser_profile("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"),
parallelism,
pages: 1,
retries: 0,
endpoint: Endpoint::Html,
time_filter: None,
safe_search: SafeSearch::Moderate,
stream_mode: false,
output_file: None,
fetch_content: true,
max_content_length: max_tam,
proxy: None,
no_proxy: false,
global_timeout_seconds: 60,
match_platform_ua: false,
per_host_limit: 2,
chrome_path: None,
selectors: std::sync::Arc::new(crate::types::SelectorConfig::default()),
}
}
fn empty_output() -> SearchOutput {
SearchOutput {
query: "q".to_string(),
engine: "duckduckgo".to_string(),
endpoint: "html".to_string(),
timestamp: "t".to_string(),
region: "br-pt".to_string(),
result_count: 0,
results: vec![],
pages_fetched: 1,
error: None,
message: None,
metadata: SearchMetadata {
execution_time_ms: 0,
selectors_hash: "x".to_string(),
retries: 0,
used_fallback_endpoint: false,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: "ua".to_string(),
used_proxy: false,
identity_used: None,
cascade_level: None,
},
}
}
#[tokio::test]
async fn enrich_with_content_no_op_when_flag_false() {
let cliente = reqwest::Client::new();
let mut cfg = test_config(3, 1000);
cfg.fetch_content = false;
let mut output = empty_output();
output.results.push(SearchResult {
position: 1,
title: "Um".to_string(),
url: "http://inexistente.local/a".to_string(),
display_url: None,
snippet: None,
original_title: None,
content: None,
content_size: None,
content_extraction_method: None,
});
let token = CancellationToken::new();
enrich_with_content(&mut output, &cliente, &cfg, &token).await;
assert!(output.results[0].content.is_none());
assert_eq!(output.metadata.concurrent_fetches, 0);
}
#[test]
fn extract_host_valid_url_returns_host() {
assert_eq!(extract_host("https://www.example.com/a"), "www.example.com");
assert_eq!(extract_host("https://API.test/x"), "api.test");
}
#[test]
fn extract_host_invalid_url_returns_unknown() {
assert_eq!(extract_host("nao-eh-url"), "unknown");
assert_eq!(extract_host(""), "unknown");
}
#[tokio::test]
async fn get_semaphore_for_host_creates_once_per_host() {
let mapa: PerHostSemaphoreMap = Arc::new(Mutex::new(HashMap::new()));
let sema_a1 = get_semaphore_for_host(&mapa, "a.com", 3).await;
let sema_a2 = get_semaphore_for_host(&mapa, "a.com", 99).await;
assert!(Arc::ptr_eq(&sema_a1, &sema_a2));
assert_eq!(sema_a1.available_permits(), 3);
let sema_b = get_semaphore_for_host(&mapa, "b.com", 5).await;
assert!(!Arc::ptr_eq(&sema_a1, &sema_b));
assert_eq!(sema_b.available_permits(), 5);
let mapa_guardado = mapa.lock().await;
assert_eq!(mapa_guardado.len(), 2);
}
#[tokio::test]
async fn get_semaphore_limits_simultaneous_concurrency_on_same_host() {
use std::sync::atomic::{AtomicUsize, Ordering};
let mapa: PerHostSemaphoreMap = Arc::new(Mutex::new(HashMap::new()));
let contador_simultaneo = Arc::new(AtomicUsize::new(0));
let pico_simultaneo = Arc::new(AtomicUsize::new(0));
let mut tarefas = Vec::with_capacity(20);
for _ in 0..20 {
let mapa = Arc::clone(&mapa);
let contador = Arc::clone(&contador_simultaneo);
let pico = Arc::clone(&pico_simultaneo);
tarefas.push(tokio::spawn(async move {
let sema = get_semaphore_for_host(&mapa, "same-host.com", 2).await;
let _permit = sema
.acquire_owned()
.await
.expect("BUG: semaphore should not be closed");
let atual = contador.fetch_add(1, Ordering::SeqCst) + 1;
let mut p = pico.load(Ordering::SeqCst);
while atual > p {
match pico.compare_exchange(p, atual, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => break,
Err(novo) => p = novo,
}
}
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
contador.fetch_sub(1, Ordering::SeqCst);
}));
}
for t in tarefas {
let _ = t.await;
}
assert!(
pico_simultaneo.load(Ordering::SeqCst) <= 2,
"simultaneous peak {} exceeded limit 2",
pico_simultaneo.load(Ordering::SeqCst)
);
}
#[tokio::test]
async fn enrich_with_content_cancelled_marks_failures() {
let cliente = reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(100))
.build()
.unwrap();
let cfg = test_config(2, 1000);
let mut output = empty_output();
for i in 0..3 {
output.results.push(SearchResult {
position: (i + 1) as u32,
title: format!("r{i}"),
url: format!("http://127.0.0.1:1/{i}"),
display_url: None,
snippet: None,
original_title: None,
content: None,
content_size: None,
content_extraction_method: None,
});
}
let token = CancellationToken::new();
token.cancel();
enrich_with_content(&mut output, &cliente, &cfg, &token).await;
assert_eq!(output.metadata.fetch_successes, 0);
}
}