use crate::content_fetch;
use crate::error::CliError;
use crate::http;
use crate::http::ProxyConfig;
use crate::search;
use crate::types::{Config, MultiSearchOutput, SearchMetadata, SearchOutput};
use rand::Rng;
use reqwest::Client;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Semaphore};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
const DELAY_BASE_STAGGERED_MS: u64 = 200;
const MAX_STAGGERED_JITTER_MS: u64 = 300;
#[tracing::instrument(skip_all, fields(query_count = queries.len(), parallelism = config.parallelism))]
pub async fn execute_parallel_searches(
queries: Vec<String>,
config: Config,
cancellation: CancellationToken,
) -> Result<MultiSearchOutput, CliError> {
let query_count = u32::try_from(queries.len()).unwrap_or(u32::MAX);
let effective_parallelism = config.parallelism.max(1);
let start_timestamp = chrono::Utc::now().to_rfc3339();
tracing::info!(
queries = query_count,
parallel = effective_parallelism,
pages = config.pages,
"Starting parallel multi-query execution"
);
let semaphore = Arc::new(Semaphore::new(effective_parallelism as usize));
let config = Arc::new(config);
let flag_rate_limit = Arc::new(AtomicBool::new(false));
let config_proxy = Arc::new(ProxyConfig::from_options(
config.proxy.as_deref(),
config.no_proxy,
));
let client_shared: Option<Client> = if config.pages <= 1 {
let client = http::build_client_with_proxy(
&config.browser_profile,
config.timeout_seconds,
&config.language,
&config.country,
&config_proxy,
)
.map_err(|e| CliError::HttpError {
message: format!("failed to build shared HTTP client for multi-query: {e}"),
cause: None,
})?;
Some(client)
} else {
None
};
let mut task_set: JoinSet<(usize, Result<SearchOutput, CliError>)> = JoinSet::new();
for (index, query) in queries.into_iter().enumerate() {
let task_semaphore = Arc::clone(&semaphore);
let task_config = Arc::clone(&config);
let task_cancellation = cancellation.clone();
let task_client = client_shared.clone();
let flag_rate_limit_task = Arc::clone(&flag_rate_limit);
let config_proxy_task = Arc::clone(&config_proxy);
task_set.spawn(async move {
let jitter_ms = rand::thread_rng().gen_range(0..MAX_STAGGERED_JITTER_MS);
let delay_total = Duration::from_millis(
DELAY_BASE_STAGGERED_MS.saturating_mul(index as u64) + jitter_ms,
);
tokio::select! {
biased;
_ = task_cancellation.cancelled() => {
return (index, Err(CliError::NetworkError { message: format!("execution cancelled before starting query {index}") }));
}
_ = tokio::time::sleep(delay_total) => {}
}
tracing::debug!(
permits_available = task_semaphore.available_permits(),
query_index = index,
"awaiting semaphore permit"
);
let permit = match task_semaphore.acquire_owned().await {
Ok(p) => p,
Err(erro) => {
return (
index,
Err(CliError::NetworkError { message: format!("semaphore closed: {erro}") }),
);
}
};
tracing::debug!(index, query = %query, "permit acquired, starting task");
if task_cancellation.is_cancelled() {
drop(permit);
return (index, Err(CliError::NetworkError { message: "execution cancelled after acquiring permit".into() }));
}
let client_result = match task_client {
Some(shared) => Ok(shared),
None => http::build_client_with_proxy(
&task_config.browser_profile,
task_config.timeout_seconds,
&task_config.language,
&task_config.country,
&config_proxy_task,
)
.map_err(|e| CliError::HttpError { message: format!("failed to build isolated Client for query: {e}"), cause: None }),
};
let result = match client_result {
Ok(client) => {
execute_query_with_cancellation(
&query,
&client,
&task_config,
&flag_rate_limit_task,
&task_cancellation,
)
.await
}
Err(erro) => Err(erro),
};
drop(permit);
(index, result)
});
}
let mut ordered_results: Vec<Option<SearchOutput>> = (0..query_count).map(|_| None).collect();
while let Some(resultado_task) = task_set.join_next().await {
match resultado_task {
Ok((index, Ok(output))) => {
ordered_results[index] = Some(output);
}
Ok((index, Err(erro))) => {
tracing::warn!(index, ?erro, "query failed, generating error SearchOutput");
ordered_results[index] = Some(error_output(index, erro, &config));
}
Err(erro_join) => {
if erro_join.is_panic() {
tracing::error!(?erro_join, "task panicked — permit recovered via RAII");
} else {
tracing::warn!(?erro_join, "task cancelled or aborted");
}
if let Some(slot) = ordered_results.iter_mut().find(|s| s.is_none()) {
*slot = Some(error_output(
0,
CliError::NetworkError {
message: format!("task panicked: {erro_join}"),
},
&config,
));
}
}
}
}
let searches: Vec<SearchOutput> = ordered_results
.into_iter()
.enumerate()
.map(|(index, slot)| {
slot.unwrap_or_else(|| {
error_output(
index,
CliError::NetworkError {
message: format!("missing result for query {index}"),
},
&config,
)
})
})
.collect();
tracing::info!(total = searches.len(), "multi-query complete");
Ok(MultiSearchOutput {
query_count,
timestamp: start_timestamp,
parallelism: effective_parallelism,
searches,
})
}
#[derive(Debug, Clone, Default)]
pub struct StreamStats {
pub total: u32,
pub successes: u32,
pub errors: u32,
pub start_timestamp: String,
pub parallelism: u32,
}
#[tracing::instrument(skip_all, fields(query_count = queries.len(), parallelism = config.parallelism))]
pub async fn execute_parallel_searches_streaming(
queries: Vec<String>,
config: Config,
cancellation: CancellationToken,
output_channel: mpsc::Sender<(usize, SearchOutput)>,
) -> Result<StreamStats, CliError> {
let query_count = u32::try_from(queries.len()).unwrap_or(u32::MAX);
let effective_parallelism = config.parallelism.max(1);
let start_timestamp = chrono::Utc::now().to_rfc3339();
tracing::info!(
queries = query_count,
parallel = effective_parallelism,
"Starting parallel multi-query streaming execution"
);
let semaphore = Arc::new(Semaphore::new(effective_parallelism as usize));
let config = Arc::new(config);
let flag_rate_limit = Arc::new(AtomicBool::new(false));
let config_proxy = Arc::new(ProxyConfig::from_options(
config.proxy.as_deref(),
config.no_proxy,
));
let client_shared: Option<Client> = if config.pages <= 1 {
let client = http::build_client_with_proxy(
&config.browser_profile,
config.timeout_seconds,
&config.language,
&config.country,
&config_proxy,
)
.map_err(|e| CliError::HttpError {
message: format!("failed to build shared HTTP client for streaming: {e}"),
cause: None,
})?;
Some(client)
} else {
None
};
let mut task_set: JoinSet<(usize, SearchOutput)> = JoinSet::new();
for (index, query) in queries.into_iter().enumerate() {
let task_semaphore = Arc::clone(&semaphore);
let task_config = Arc::clone(&config);
let task_cancellation = cancellation.clone();
let task_client = client_shared.clone();
let flag_rate_limit_task = Arc::clone(&flag_rate_limit);
let config_proxy_task = Arc::clone(&config_proxy);
task_set.spawn(async move {
let jitter_ms = rand::thread_rng().gen_range(0..MAX_STAGGERED_JITTER_MS);
let delay_total = Duration::from_millis(
DELAY_BASE_STAGGERED_MS.saturating_mul(index as u64) + jitter_ms,
);
tokio::select! {
biased;
_ = task_cancellation.cancelled() => {
return (
index,
error_output(
index,
CliError::NetworkError { message: format!("execution cancelled before query {index}") },
&task_config,
),
);
}
_ = tokio::time::sleep(delay_total) => {}
}
tracing::debug!(
permits_available = task_semaphore.available_permits(),
query_index = index,
"awaiting semaphore permit (streaming)"
);
let permit = match task_semaphore.acquire_owned().await {
Ok(p) => p,
Err(erro) => {
return (
index,
error_output(
index,
CliError::NetworkError { message: format!("semaphore closed: {erro}") },
&task_config,
),
);
}
};
tracing::debug!(query_index = index, "permit acquired (streaming)");
if task_cancellation.is_cancelled() {
drop(permit);
return (
index,
error_output(
index,
CliError::NetworkError { message: "execution cancelled after permit".into() },
&task_config,
),
);
}
let client_result = match task_client {
Some(c) => Ok(c),
None => http::build_client_with_proxy(
&task_config.browser_profile,
task_config.timeout_seconds,
&task_config.language,
&task_config.country,
&config_proxy_task,
)
.map_err(|e| CliError::HttpError { message: format!("failed to build isolated Client: {e}"), cause: None }),
};
let result = match client_result {
Ok(client) => {
execute_query_with_cancellation(
&query,
&client,
&task_config,
&flag_rate_limit_task,
&task_cancellation,
)
.await
}
Err(erro) => Err(erro),
};
drop(permit);
match result {
Ok(output) => (index, output),
Err(erro) => (index, error_output(index, erro, &task_config)),
}
});
}
let mut success_count: u32 = 0;
let mut error_count: u32 = 0;
while let Some(task_result) = task_set.join_next().await {
match task_result {
Ok((index, output)) => {
if output.error.is_some() {
error_count = error_count.saturating_add(1);
} else {
success_count = success_count.saturating_add(1);
}
if let Err(send_error) = output_channel.send((index, output)).await {
tracing::warn!(
?send_error,
"streaming consumer closed channel — aborting send"
);
task_set.abort_all();
break;
}
}
Err(join_err) => {
if join_err.is_panic() {
tracing::error!(
?join_err,
"task panicked in streaming — permit recovered via RAII"
);
} else {
tracing::warn!(?join_err, "task cancelled in streaming");
}
error_count = error_count.saturating_add(1);
}
}
}
tracing::info!(
total = query_count,
successes = success_count,
errors = error_count,
"streaming complete"
);
Ok(StreamStats {
total: query_count,
successes: success_count,
errors: error_count,
start_timestamp,
parallelism: effective_parallelism,
})
}
async fn execute_query_with_cancellation(
query: &str,
client: &Client,
config: &Config,
flag_rate_limit: &Arc<AtomicBool>,
cancellation: &CancellationToken,
) -> Result<SearchOutput, CliError> {
let start = Instant::now();
if cancellation.is_cancelled() {
return Err(CliError::NetworkError {
message: format!("execution cancelled before request for {query:?}"),
});
}
tracing::info!(query = %query, endpoint = config.endpoint.as_str(), "sending request");
let mut cfg_task = config.clone();
cfg_task.query = query.to_string();
let agregado = match search::search_with_pagination(
client,
&cfg_task,
query,
flag_rate_limit,
cancellation,
)
.await
{
Ok(a) => a,
Err(reason) => {
let elapsed_ms = start.elapsed().as_millis().min(u64::MAX as u128) as u64;
let timestamp = chrono::Utc::now().to_rfc3339();
let selectors_hash = crate::pipeline::calculate_selectors_hash(&config.selectors);
let used_proxy =
ProxyConfig::from_options(config.proxy.as_deref(), config.no_proxy).is_active();
return Ok(SearchOutput {
query: query.to_string(),
engine: "duckduckgo".to_string(),
endpoint: config.endpoint.as_str().to_string(),
timestamp,
region: search::format_kl(&config.language, &config.country),
result_count: 0,
results: Vec::new(),
pages_fetched: 0,
error: Some(reason.as_error_code().to_string()),
message: Some(reason.message()),
metadata: SearchMetadata {
execution_time_ms: elapsed_ms,
selectors_hash,
retries: config.retries,
used_fallback_endpoint: false,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: config.user_agent.clone(),
used_proxy,
identity_used: None,
cascade_level: None,
},
});
}
};
let quantidade = u32::try_from(agregado.results.len()).unwrap_or(u32::MAX);
let selectors_hash = crate::pipeline::calculate_selectors_hash(&config.selectors);
let elapsed_ms = start.elapsed().as_millis().min(u64::MAX as u128) as u64;
let timestamp = chrono::Utc::now().to_rfc3339();
let retries_count = agregado.attempts.saturating_sub(1);
let used_proxy =
ProxyConfig::from_options(config.proxy.as_deref(), config.no_proxy).is_active();
let metadata_val = SearchMetadata {
execution_time_ms: elapsed_ms,
selectors_hash,
retries: retries_count,
used_fallback_endpoint: agregado.used_fallback_lite,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: config.user_agent.clone(),
used_proxy,
identity_used: None,
cascade_level: None,
};
let mut output = SearchOutput {
query: query.to_string(),
engine: "duckduckgo".to_string(),
endpoint: agregado.effective_endpoint.as_str().to_string(),
timestamp,
region: search::format_kl(&config.language, &config.country),
result_count: quantidade,
results: agregado.results,
pages_fetched: agregado.pages_fetched,
error: None,
message: None,
metadata: metadata_val,
};
content_fetch::enrich_with_content(&mut output, client, config, cancellation).await;
Ok(output)
}
#[cold]
fn error_output(index: usize, erro: CliError, config: &Config) -> SearchOutput {
let query_ref = config.queries.get(index).cloned().unwrap_or_default();
let message = format!("{erro:#}");
let timestamp = chrono::Utc::now().to_rfc3339();
let selectors_hash = crate::pipeline::calculate_selectors_hash(&config.selectors);
SearchOutput {
query: query_ref,
engine: "duckduckgo".to_string(),
endpoint: "html".to_string(),
timestamp,
region: search::format_kl(&config.language, &config.country),
result_count: 0,
results: Vec::new(),
pages_fetched: 0,
error: Some(crate::error::codes::NETWORK_ERROR.to_string()),
message: Some(message),
metadata: SearchMetadata {
execution_time_ms: 0,
selectors_hash,
retries: 0,
used_fallback_endpoint: false,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: config.user_agent.clone(),
used_proxy: false,
identity_used: None,
cascade_level: None,
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{Endpoint, OutputFormat, SafeSearch, SelectorConfig};
fn test_config(queries: Vec<String>, parallelism: u32) -> Config {
let first_query = queries.first().cloned().unwrap_or_default();
Config {
query: first_query,
queries,
num_results: None,
format: OutputFormat::Json,
timeout_seconds: 15,
language: "pt".to_string(),
country: "br".to_string(),
verbose: false,
quiet: true,
user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36".to_string(),
browser_profile: crate::http::create_browser_profile("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"),
parallelism,
pages: 1,
retries: 0,
endpoint: Endpoint::Html,
time_filter: None,
safe_search: SafeSearch::Moderate,
stream_mode: false,
output_file: None,
fetch_content: false,
max_content_length: 10_000,
proxy: None,
no_proxy: false,
global_timeout_seconds: 60,
match_platform_ua: false,
per_host_limit: 2,
chrome_path: None,
selectors: std::sync::Arc::new(SelectorConfig::default()),
}
}
#[test]
fn error_output_fills_required_fields() {
let cfg = test_config(vec!["alfa".into(), "beta".into()], 2);
let erro = CliError::NetworkError {
message: "synthetic test failure".into(),
};
let output = error_output(1, erro, &cfg);
assert_eq!(output.query, "beta");
assert_eq!(output.result_count, 0);
assert!(output.results.is_empty());
assert!(output.error.is_some());
assert!(output.message.is_some());
assert_eq!(output.region, "br-pt");
}
#[test]
fn error_output_index_out_of_bounds_uses_empty_string() {
let cfg = test_config(vec!["apenas uma".into()], 1);
let output = error_output(
99,
CliError::NetworkError {
message: "out of bounds".into(),
},
&cfg,
);
assert!(output.query.is_empty());
assert!(output.error.is_some());
}
#[tokio::test]
async fn parallel_searches_cancelled_before_spawn_returns_errors() {
let token = CancellationToken::new();
token.cancel();
let cfg = test_config(
vec!["query-a".into(), "query-b".into(), "query-c".into()],
3,
);
let queries = cfg.queries.clone();
let result = execute_parallel_searches(queries, cfg, token).await;
let output = result.expect("function should return Ok even when all fail");
assert_eq!(output.query_count, 3);
assert_eq!(output.searches.len(), 3);
assert_eq!(output.parallelism, 3);
for search in &output.searches {
assert!(
search.error.is_some(),
"query {:?} deveria ter falhado com cancelamento",
search.query
);
}
}
#[test]
fn calculate_selectors_hash_returns_16_chars() {
let cfg = SelectorConfig::default();
let hash = crate::pipeline::calculate_selectors_hash(&cfg);
assert_eq!(hash.len(), 16);
assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
}
}