use anyhow::{Context, Result};
use serde_json::json;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use super::stats::ProcessingStats;
use crate::cli_utils::{data_value_to_json, format_cidr_into};
#[derive(Debug, Clone, Default)]
pub struct ExtractorConfig {
pub overrides: HashMap<String, bool>,
has_enables: bool,
}
impl ExtractorConfig {
pub fn from_arg(arg: Option<&str>) -> Self {
let mut overrides = HashMap::new();
let mut has_enables = false;
if let Some(extractors_str) = arg {
for extractor in extractors_str.split(',') {
let extractor = extractor.trim();
let (is_disable, name) = if let Some(name) = extractor.strip_prefix('-') {
(true, name)
} else {
(false, extractor)
};
if !is_disable {
has_enables = true;
}
let names = Self::expand_alias(name);
for n in names {
overrides.insert(n.to_string(), !is_disable);
}
}
}
Self {
overrides,
has_enables,
}
}
fn expand_alias(name: &str) -> Vec<&str> {
match name {
"crypto" => vec!["bitcoin", "ethereum", "monero"],
"ip" => vec!["ipv4", "ipv6"],
"domains" => vec!["domain"],
"emails" => vec!["email"],
"hashes" => vec!["hash"],
"ips" => vec!["ipv4", "ipv6"],
_ => vec![name],
}
}
pub fn should_enable(&self, name: &str, default: bool) -> bool {
self.overrides.get(name).copied().unwrap_or(default)
}
pub fn has_explicit_enables(&self) -> bool {
self.has_enables
}
}
pub use matchy::processing::DataBatch;
fn auto_tune_worker_count() -> usize {
std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(4)
.max(1)
}
pub struct CliMatchResult {
pub source_file: PathBuf,
pub matched_text: String,
pub match_type: String,
pub timestamp: f64,
pub pattern_count: Option<usize>,
pub data: Option<serde_json::Value>,
pub prefix_len: Option<u8>,
pub cidr: Option<String>,
}
pub use matchy::processing::WorkerStats;
#[allow(clippy::too_many_arguments)]
pub fn process_parallel(
inputs: &[PathBuf],
db: Arc<matchy::Database>,
num_threads: usize,
explicit_readers: Option<usize>,
_batch_bytes: usize,
output_format: &str,
_show_stats: bool,
_show_progress: bool,
_overall_start: Instant,
extractor_config: &ExtractorConfig,
debug_routing: bool,
) -> Result<(
ProcessingStats,
usize,
usize,
matchy::processing::RoutingStats,
)> {
let num_workers = if num_threads == 0 {
auto_tune_worker_count()
} else {
num_threads
};
let num_readers_opt = explicit_readers;
let output_json = output_format == "json";
let progress_reporter = if _show_progress {
Some(Arc::new(Mutex::new(
crate::match_processor::ProgressReporter::new(),
)))
} else {
None
};
let overall_start = _overall_start;
let ext_config = extractor_config.clone();
let result = matchy::processing::process_files_parallel(
inputs,
num_readers_opt, Some(num_workers),
move || {
let db_clone = Arc::clone(&db);
let extractor = create_extractor_for_db(&db_clone, &ext_config)
.map_err(|e| format!("Extractor init failed: {e}"))?;
let worker = matchy::processing::Worker::builder()
.extractor(extractor)
.add_database("default", db_clone)
.build();
Ok::<_, String>(worker)
},
progress_reporter.map(|pr| {
move |stats: &matchy::processing::WorkerStats| {
let mut reporter = pr.lock().unwrap();
if reporter.should_update() {
let mut ps = ProcessingStats::new();
ps.lines_processed = stats.lines_processed;
ps.candidates_tested = stats.candidates_tested;
ps.total_matches = stats.matches_found;
ps.total_bytes = stats.total_bytes;
reporter.show(&ps, overall_start.elapsed());
}
}
}),
debug_routing, )
.map_err(|e| anyhow::anyhow!("Parallel processing failed: {e}"))?;
if _show_progress {
eprintln!();
}
for lib_match in &result.matches {
if let Some(cli_match) = library_match_to_cli_match(lib_match) {
output_cli_match(&cli_match, output_json)?;
}
}
let mut aggregate = ProcessingStats::new();
aggregate.lines_processed = result.worker_stats.lines_processed;
aggregate.candidates_tested = result.worker_stats.candidates_tested;
aggregate.total_matches = result.worker_stats.matches_found;
aggregate.total_bytes = result.worker_stats.total_bytes;
aggregate.extraction_time = result.worker_stats.extraction_time;
aggregate.extraction_samples = result.worker_stats.extraction_samples;
aggregate.lookup_time = result.worker_stats.lookup_time;
aggregate.lookup_samples = result.worker_stats.lookup_samples;
aggregate.ipv4_count = result.worker_stats.ipv4_count;
aggregate.ipv6_count = result.worker_stats.ipv6_count;
aggregate.domain_count = result.worker_stats.domain_count;
aggregate.email_count = result.worker_stats.email_count;
Ok((
aggregate,
result.actual_workers,
result.actual_readers,
result.routing_stats,
))
}
pub enum WorkerMessage {
Match(CliMatchResult),
Stats {
worker_id: usize,
stats: WorkerStats,
},
}
pub fn create_extractor_for_db(
db: &matchy::Database,
config: &ExtractorConfig,
) -> Result<matchy::extractor::Extractor> {
use matchy::extractor::Extractor;
let has_ip = db.has_ip_data();
let has_strings = db.has_literal_data() || db.has_glob_data();
let use_defaults = !config.has_explicit_enables();
let default_ipv4 = use_defaults && has_ip;
let default_ipv6 = use_defaults && has_ip;
let default_domains = use_defaults && has_strings;
let default_emails = use_defaults && has_strings;
let default_hashes = use_defaults && has_strings;
let default_bitcoin = use_defaults && has_strings;
let default_ethereum = use_defaults && has_strings;
let default_monero = use_defaults && has_strings;
Extractor::builder()
.extract_ipv4(config.should_enable("ipv4", default_ipv4))
.extract_ipv6(config.should_enable("ipv6", default_ipv6))
.extract_domains(config.should_enable("domain", default_domains))
.extract_emails(config.should_enable("email", default_emails))
.extract_hashes(config.should_enable("hash", default_hashes))
.extract_bitcoin(config.should_enable("bitcoin", default_bitcoin))
.extract_ethereum(config.should_enable("ethereum", default_ethereum))
.extract_monero(config.should_enable("monero", default_monero))
.build()
.context("Failed to create extractor")
}
pub struct MatchBuffers {
data_values: Vec<serde_json::Value>,
matched_text: String,
cidr: String,
}
impl MatchBuffers {
pub fn new() -> Self {
Self {
data_values: Vec::with_capacity(8),
matched_text: String::with_capacity(256),
cidr: String::with_capacity(64),
}
}
}
fn library_match_to_cli_match(
lib_match: &matchy::processing::MatchResult,
) -> Option<CliMatchResult> {
use matchy::QueryResult;
match &lib_match.result {
QueryResult::Ip {
data, prefix_len, ..
} => {
let mut cidr = String::new();
format_cidr_into(&lib_match.matched_text, *prefix_len, &mut cidr);
Some(CliMatchResult {
source_file: lib_match.source.clone(),
matched_text: lib_match.matched_text.clone(),
match_type: "ip".to_string(),
timestamp: 0.0,
pattern_count: None,
data: Some(data_value_to_json(data)),
prefix_len: Some(*prefix_len),
cidr: Some(cidr),
})
}
QueryResult::Pattern {
pattern_ids, data, ..
} => {
let data_values: Vec<_> = data
.iter()
.filter_map(|opt_dv| opt_dv.as_ref().map(data_value_to_json))
.collect();
Some(CliMatchResult {
source_file: lib_match.source.clone(),
matched_text: lib_match.matched_text.clone(),
match_type: "pattern".to_string(),
timestamp: 0.0,
pattern_count: Some(pattern_ids.len()),
data: if data_values.is_empty() {
None
} else {
Some(serde_json::Value::Array(data_values))
},
prefix_len: None,
cidr: None,
})
}
QueryResult::NotFound => None,
}
}
fn output_cli_match(result: &CliMatchResult, output_json: bool) -> Result<()> {
if output_json {
let mut match_obj = json!({
"timestamp": format!("{:.3}", result.timestamp),
"source": result.source_file.display().to_string(),
"matched_text": result.matched_text,
"match_type": result.match_type,
});
if let Some(pattern_count) = result.pattern_count {
match_obj["pattern_count"] = json!(pattern_count);
}
if let Some(ref data) = result.data {
match_obj["data"] = data.clone();
}
if let Some(prefix_len) = result.prefix_len {
match_obj["prefix_len"] = json!(prefix_len);
}
if let Some(ref cidr) = result.cidr {
match_obj["cidr"] = json!(cidr);
}
println!("{}", serde_json::to_string(&match_obj)?);
}
Ok(())
}
pub fn build_match_result(
lib_match: &matchy::processing::MatchResult,
match_buffers: &mut MatchBuffers,
) -> Option<CliMatchResult> {
use matchy::QueryResult;
match_buffers.data_values.clear();
match_buffers.matched_text.clear();
match_buffers.cidr.clear();
match &lib_match.result {
QueryResult::Ip {
data, prefix_len, ..
} => {
format_cidr_into(
&lib_match.matched_text,
*prefix_len,
&mut match_buffers.cidr,
);
Some(CliMatchResult {
source_file: lib_match.source.clone(),
matched_text: lib_match.matched_text.clone(),
match_type: "ip".to_string(),
timestamp: 0.0, pattern_count: None,
data: Some(data_value_to_json(data)),
prefix_len: Some(*prefix_len),
cidr: Some(match_buffers.cidr.clone()),
})
}
QueryResult::Pattern {
pattern_ids, data, ..
} => {
let data_values: Vec<_> = data
.iter()
.filter_map(|opt_dv| opt_dv.as_ref().map(data_value_to_json))
.collect();
Some(CliMatchResult {
source_file: lib_match.source.clone(),
matched_text: lib_match.matched_text.clone(),
match_type: "pattern".to_string(),
timestamp: 0.0,
pattern_count: Some(pattern_ids.len()),
data: if data_values.is_empty() {
None
} else {
Some(serde_json::Value::Array(data_values))
},
prefix_len: None,
cidr: None,
})
}
QueryResult::NotFound => None,
}
}