use crate::content_fetch;
use crate::error::CliError;
use crate::http;
use crate::http::ProxyConfig;
use crate::parallel;
use crate::search;
use crate::types::{Config, MultiSearchOutput, SearchMetadata, SearchOutput, SelectorConfig};
use std::collections::HashSet;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub enum PipelineResult {
Single(Box<SearchOutput>),
Multi(Box<MultiSearchOutput>),
Stream(crate::parallel::StreamStats),
}
impl PipelineResult {
pub fn total_results(&self) -> u32 {
match self {
PipelineResult::Single(s) => s.result_count,
PipelineResult::Multi(m) => m
.searches
.iter()
.map(|b| b.result_count)
.fold(0u32, |acc, v| acc.saturating_add(v)),
PipelineResult::Stream(e) => e.successes,
}
}
}
pub async fn execute_pipeline(
config: Config,
cancellation: CancellationToken,
) -> Result<PipelineResult, CliError> {
match config.queries.len() {
0 => Err(CliError::InvalidConfig {
message: "no queries to execute (list empty after filtering)".into(),
}),
1 => {
if config.stream_mode {
tracing::warn!(
"--stream ignored in single-query mode (only 1 effective query); \
emitting default aggregated output"
);
}
let mut cfg_single = config.clone();
cfg_single.query = cfg_single.queries[0].clone();
let output = execute_single_search(&cfg_single, &cancellation).await?;
Ok(PipelineResult::Single(Box::new(output)))
}
_ => {
if config.stream_mode {
return execute_pipeline_streaming(config, cancellation).await;
}
let queries = config.queries.clone();
let multi = parallel::execute_parallel_searches(queries, config, cancellation).await?;
Ok(PipelineResult::Multi(Box::new(multi)))
}
}
}
async fn execute_pipeline_streaming(
config: Config,
cancellation: CancellationToken,
) -> Result<PipelineResult, CliError> {
use crate::types::OutputFormat;
use tokio::sync::mpsc;
let format = config.format;
let output_file = config.output_file.clone();
let queries = config.queries.clone();
let paralelismo = config.parallelism.max(1) as usize;
let (tx, mut rx) = mpsc::channel::<(usize, SearchOutput)>(paralelismo.saturating_mul(2).max(2));
let consumer = tokio::spawn(async move {
let mut emitidos: u64 = 0;
while let Some((index, output)) = rx.recv().await {
let resolved_format = match format {
OutputFormat::Auto | OutputFormat::Json => OutputFormat::Json,
outro => outro,
};
let res = match resolved_format {
OutputFormat::Json | OutputFormat::Auto => {
crate::output::emit_ndjson(&output, output_file.as_deref())
}
OutputFormat::Text => {
crate::output::emit_stream_text(index, &output, output_file.as_deref())
}
OutputFormat::Markdown => {
crate::output::emit_stream_markdown(index, &output, output_file.as_deref())
}
};
if let Err(erro) = res {
if crate::output::is_broken_pipe(&erro) {
tracing::debug!("BrokenPipe in streaming — stopping consumer");
return Ok(());
}
tracing::error!(?erro, "failed to emit streaming item — aborting consumer");
return Err(erro);
}
emitidos = emitidos.saturating_add(1);
}
tracing::info!(emitidos, "streaming consumer finished");
Ok::<(), CliError>(())
});
let stats =
parallel::execute_parallel_searches_streaming(queries, config, cancellation, tx).await?;
match consumer.await {
Ok(Ok(())) => {}
Ok(Err(erro)) => return Err(erro),
Err(erro_join) => {
if erro_join.is_panic() {
tracing::error!(?erro_join, "streaming consumer panicked");
} else {
tracing::warn!(?erro_join, "streaming consumer cancelled");
}
return Err(CliError::NetworkError {
message: format!("streaming consumer panicked: {erro_join}"),
});
}
}
Ok(PipelineResult::Stream(stats))
}
pub async fn execute_single_search(
cfg: &Config,
cancellation: &CancellationToken,
) -> Result<SearchOutput, CliError> {
let start = Instant::now();
let config_proxy = ProxyConfig::from_options(cfg.proxy.as_deref(), cfg.no_proxy);
let client = http::build_client_with_proxy(
&cfg.browser_profile,
cfg.timeout_seconds,
&cfg.language,
&cfg.country,
&config_proxy,
)?;
tracing::info!(query = %cfg.query, endpoint = cfg.endpoint.as_str(), "Executing search");
let flag_rate_limit = Arc::new(AtomicBool::new(false));
let agregado = match search::search_with_pagination(
&client,
cfg,
&cfg.query,
&flag_rate_limit,
cancellation,
)
.await
{
Ok(a) => a,
Err(reason) => {
return Ok(failure_output(cfg, &reason, start));
}
};
let quantidade = u32::try_from(agregado.results.len()).unwrap_or(u32::MAX);
let selectors_hash = calculate_selectors_hash(&cfg.selectors);
let elapsed_ms = start.elapsed().as_millis().min(u64::MAX as u128) as u64;
let timestamp = chrono::Utc::now().to_rfc3339();
let retries_count = agregado.attempts.saturating_sub(1);
let metadata_val = SearchMetadata {
execution_time_ms: elapsed_ms,
selectors_hash,
retries: retries_count,
used_fallback_endpoint: agregado.used_fallback_lite,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: cfg.user_agent.clone(),
used_proxy: config_proxy.is_active(),
identity_used: None,
cascade_level: None,
};
let mut output = SearchOutput {
query: cfg.query.clone(),
engine: "duckduckgo".to_string(),
endpoint: agregado.effective_endpoint.as_str().to_string(),
timestamp,
region: search::format_kl(&cfg.language, &cfg.country),
result_count: quantidade,
results: agregado.results,
pages_fetched: agregado.pages_fetched,
error: None,
message: None,
metadata: metadata_val,
};
content_fetch::enrich_with_content(&mut output, &client, cfg, cancellation).await;
tracing::info!(
total = output.result_count,
pages = output.pages_fetched,
fallback = output.metadata.used_fallback_endpoint,
fetch_content = cfg.fetch_content,
fetch_successes = output.metadata.fetch_successes,
"Search completed successfully"
);
Ok(output)
}
#[cold]
fn failure_output(cfg: &Config, reason: &search::RetryFailReason, start: Instant) -> SearchOutput {
let elapsed_ms = start.elapsed().as_millis().min(u64::MAX as u128) as u64;
let timestamp = chrono::Utc::now().to_rfc3339();
let selectors_hash = calculate_selectors_hash(&cfg.selectors);
let used_proxy = ProxyConfig::from_options(cfg.proxy.as_deref(), cfg.no_proxy).is_active();
SearchOutput {
query: cfg.query.clone(),
engine: "duckduckgo".to_string(),
endpoint: cfg.endpoint.as_str().to_string(),
timestamp,
region: search::format_kl(&cfg.language, &cfg.country),
result_count: 0,
results: Vec::new(),
pages_fetched: 0,
error: Some(reason.as_error_code().to_string()),
message: Some(reason.message()),
metadata: SearchMetadata {
execution_time_ms: elapsed_ms,
selectors_hash,
retries: cfg.retries,
used_fallback_endpoint: false,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: cfg.user_agent.clone(),
used_proxy,
identity_used: None,
cascade_level: None,
},
}
}
pub async fn execute(cfg: &Config) -> Result<SearchOutput, CliError> {
execute_single_search(cfg, &CancellationToken::new()).await
}
pub fn combine_and_dedup_queries(
posicionais: Vec<String>,
de_arquivo: Vec<String>,
de_stdin: Vec<String>,
) -> Vec<String> {
let capacity = posicionais.len() + de_arquivo.len() + de_stdin.len();
let mut vistos: HashSet<String> = HashSet::with_capacity(capacity);
let mut result_vec: Vec<String> = Vec::with_capacity(capacity);
let todas = posicionais.into_iter().chain(de_arquivo).chain(de_stdin);
for raw in todas {
let clean = raw.trim().to_string();
if clean.is_empty() {
continue;
}
if vistos.insert(clean.clone()) {
result_vec.push(clean);
}
}
result_vec
}
pub fn read_queries_from_file(path: &Path) -> Result<Vec<String>, CliError> {
use std::io::BufRead;
let file = std::fs::File::open(path).map_err(|e| CliError::PathError {
message: format!("failed to open query file {}: {e}", path.display()),
})?;
let reader = std::io::BufReader::new(file);
let mut lines_vec: Vec<String> = Vec::with_capacity(20);
for (index, line) in reader.lines().enumerate() {
let line = line.map_err(|e| CliError::PathError {
message: format!(
"failed to read line {} of {}: {e}",
index + 1,
path.display()
),
})?;
let trimmed = line.trim().to_string();
if !trimmed.is_empty() {
lines_vec.push(trimmed);
}
}
Ok(lines_vec)
}
pub fn read_queries_from_stdin_if_pipe() -> Result<Vec<String>, CliError> {
use std::io::{BufRead, IsTerminal};
if std::io::stdin().is_terminal() {
return Ok(Vec::new());
}
let reader = std::io::stdin().lock();
let mut lines_vec: Vec<String> = Vec::with_capacity(20);
for (index, line) in reader.lines().enumerate() {
let line = line.map_err(|e| CliError::PathError {
message: format!("failed to read line {} from stdin: {e}", index + 1),
})?;
let trimmed = line.trim().to_string();
if !trimmed.is_empty() {
lines_vec.push(trimmed);
}
}
Ok(lines_vec)
}
pub(crate) fn calculate_selectors_hash(cfg: &SelectorConfig) -> String {
match toml::to_string(cfg) {
Ok(serialized) => {
let hash = blake3::hash(serialized.as_bytes());
hash.to_hex().chars().take(16).collect()
}
Err(err) => {
tracing::warn!(?err, "failed to serialize selector config for hash");
"unknown".to_string()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn calculate_selectors_hash_returns_16_chars() {
let cfg = SelectorConfig::default();
let hash = calculate_selectors_hash(&cfg);
assert_eq!(hash.len(), 16);
assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn calculate_selectors_hash_is_deterministic() {
let cfg = SelectorConfig::default();
let h1 = calculate_selectors_hash(&cfg);
let h2 = calculate_selectors_hash(&cfg);
assert_eq!(h1, h2);
}
#[test]
fn combinar_deduplica_preservando_ordem_da_primeira_ocorrencia() {
let posicionais = vec!["alfa".to_string(), "beta".to_string()];
let de_arquivo = vec!["beta".to_string(), "gama".to_string()];
let de_stdin = vec!["alfa".to_string(), "delta".to_string()];
let combinado = combine_and_dedup_queries(posicionais, de_arquivo, de_stdin);
assert_eq!(
combinado,
vec!["alfa", "beta", "gama", "delta"],
"ordem deve ser da primeira ocorrência; duplicatas devem ser removidas"
);
}
#[test]
fn combinar_remove_strings_vazias_e_apenas_espacos() {
let posicionais = vec![" ".to_string(), "rust".to_string(), "".to_string()];
let de_arquivo = vec!["\t\t".to_string(), "tokio".to_string()];
let de_stdin = vec![];
let combinado = combine_and_dedup_queries(posicionais, de_arquivo, de_stdin);
assert_eq!(combinado, vec!["rust", "tokio"]);
}
#[test]
fn combine_trims_whitespace_before_comparing() {
let posicionais = vec![" alfa ".to_string()];
let de_arquivo = vec!["alfa".to_string()];
let de_stdin = vec!["alfa\t".to_string()];
let combinado = combine_and_dedup_queries(posicionais, de_arquivo, de_stdin);
assert_eq!(
combinado,
vec!["alfa"],
"queries equivalentes após trim devem ser deduplicadas"
);
}
#[test]
fn combine_empty_returns_empty() {
let combinado = combine_and_dedup_queries(vec![], vec![], vec![]);
assert!(combinado.is_empty());
}
#[test]
fn read_queries_from_file_accepts_windows_lines_and_empty() {
use std::io::Write;
let dir = std::env::temp_dir().join("ddg_cli_iter2_queries_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("queries.txt");
let content = "rust\r\ntokio\r\n\r\n axum \n\nhttp://exemplo.com\n";
let mut file = std::fs::File::create(&path).unwrap();
file.write_all(content.as_bytes()).unwrap();
drop(file);
let lines = read_queries_from_file(&path).expect("should read file");
assert_eq!(lines, vec!["rust", "tokio", "axum", "http://exemplo.com"]);
let _ = std::fs::remove_file(&path);
}
#[test]
fn total_results_in_single_output() {
let output = SearchOutput {
query: "q".into(),
engine: "duckduckgo".into(),
endpoint: "html".into(),
timestamp: "t".into(),
region: "br-pt".into(),
result_count: 7,
results: vec![],
pages_fetched: 1,
error: None,
message: None,
metadata: SearchMetadata {
execution_time_ms: 0,
selectors_hash: "x".into(),
retries: 0,
used_fallback_endpoint: false,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: "ua".into(),
used_proxy: false,
identity_used: None,
cascade_level: None,
},
};
assert_eq!(PipelineResult::Single(Box::new(output)).total_results(), 7);
}
#[test]
fn total_results_in_multi_output_sums_all() {
let nova_saida = |n: u32| SearchOutput {
query: "q".into(),
engine: "duckduckgo".into(),
endpoint: "html".into(),
timestamp: "t".into(),
region: "br-pt".into(),
result_count: n,
results: vec![],
pages_fetched: 1,
error: None,
message: None,
metadata: SearchMetadata {
execution_time_ms: 0,
selectors_hash: "x".into(),
retries: 0,
used_fallback_endpoint: false,
concurrent_fetches: 0,
fetch_successes: 0,
fetch_failures: 0,
used_chrome: false,
user_agent: "ua".into(),
used_proxy: false,
identity_used: None,
cascade_level: None,
},
};
let multi = MultiSearchOutput {
query_count: 3,
timestamp: "t".into(),
parallelism: 3,
searches: vec![nova_saida(2), nova_saida(5), nova_saida(0)],
};
assert_eq!(PipelineResult::Multi(Box::new(multi)).total_results(), 7);
}
}