1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::time::sleep;
8use tracing::{debug, info, instrument};
9
10use crate::availability::{AvailabilityChecker, AvailabilityResult};
11use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
12use crate::error::Result;
13use crate::lookup::{LookupResult, SmartLookup};
14use crate::rdap::{RdapClient, RdapResponse};
15use crate::ssl::{SslChecker, SslReport};
16use crate::status::{StatusClient, StatusResponse};
17use crate::whois::{WhoisClient, WhoisResponse};
18
19pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum BulkOperation {
25 Whois {
26 domain: String,
27 },
28 Rdap {
29 domain: String,
30 },
31 Dns {
32 domain: String,
33 record_type: RecordType,
34 },
35 Propagation {
36 domain: String,
37 record_type: RecordType,
38 },
39 Lookup {
40 domain: String,
41 },
42 Status {
43 domain: String,
44 },
45 Avail {
46 domain: String,
47 },
48 Info {
49 domain: String,
50 },
51 Ssl {
52 domain: String,
53 },
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
59pub enum BulkResultData {
60 Whois(WhoisResponse),
61 Rdap(Box<RdapResponse>),
62 Dns(Vec<DnsRecord>),
63 Propagation(PropagationResult),
64 Lookup(LookupResult),
65 Status(StatusResponse),
66 Avail(AvailabilityResult),
67 Info(crate::domain_info::DomainInfo),
68 Ssl(SslReport),
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct BulkResult {
74 pub operation: BulkOperation,
75 pub success: bool,
76 pub data: Option<BulkResultData>,
77 pub error: Option<String>,
78 pub duration_ms: u64,
79}
80
81#[derive(Debug, Clone)]
83pub struct BulkExecutor {
84 concurrency: usize,
85 rate_limit_delay: Duration,
86 whois_client: WhoisClient,
87 rdap_client: RdapClient,
88 dns_resolver: DnsResolver,
89 propagation_checker: PropagationChecker,
90 smart_lookup: SmartLookup,
91 status_client: StatusClient,
92 availability_checker: AvailabilityChecker,
93 ssl_checker: SslChecker,
94}
95
96impl Default for BulkExecutor {
97 fn default() -> Self {
98 Self::new()
99 }
100}
101
102impl BulkExecutor {
103 pub fn new() -> Self {
104 Self {
105 concurrency: 10,
106 rate_limit_delay: Duration::from_millis(100),
107 whois_client: WhoisClient::new(),
108 rdap_client: RdapClient::new(),
109 dns_resolver: DnsResolver::new(),
110 propagation_checker: PropagationChecker::new(),
111 smart_lookup: SmartLookup::new(),
112 status_client: StatusClient::new(),
113 availability_checker: AvailabilityChecker::new(),
114 ssl_checker: SslChecker::new(),
115 }
116 }
117
118 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
119 self.concurrency = concurrency.clamp(1, 50);
120 self
121 }
122
123 pub fn with_rate_limit(mut self, delay: Duration) -> Self {
124 self.rate_limit_delay = delay;
125 self
126 }
127
128 #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
129 pub async fn execute(
130 &self,
131 operations: Vec<BulkOperation>,
132 progress: Option<ProgressCallback>,
133 ) -> Vec<BulkResult> {
134 let total = operations.len();
135 let completed = Arc::new(AtomicUsize::new(0));
136
137 info!("bulk operation started");
138 debug!(
139 total = total,
140 concurrency = self.concurrency,
141 "Starting bulk execution"
142 );
143
144 let results: Vec<BulkResult> = stream::iter(operations)
145 .map(|op| {
146 let completed = completed.clone();
147 let progress = progress.as_ref();
148 let rate_limit_delay = self.rate_limit_delay;
149 let whois_client = &self.whois_client;
150 let rdap_client = &self.rdap_client;
151 let dns_resolver = &self.dns_resolver;
152 let propagation_checker = &self.propagation_checker;
153 let smart_lookup = &self.smart_lookup;
154 let status_client = &self.status_client;
155 let availability_checker = &self.availability_checker;
156 let ssl_checker = &self.ssl_checker;
157
158 async move {
159 if !rate_limit_delay.is_zero() {
161 sleep(rate_limit_delay).await;
162 }
163
164 let start = std::time::Instant::now();
165 let result = execute_operation(
166 &op,
167 &Clients {
168 whois: whois_client,
169 rdap: rdap_client,
170 dns: dns_resolver,
171 propagation: propagation_checker,
172 lookup: smart_lookup,
173 status: status_client,
174 avail: availability_checker,
175 ssl: ssl_checker,
176 },
177 )
178 .await;
179 let duration_ms = start.elapsed().as_millis() as u64;
180
181 let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
182
183 if let Some(progress) = progress {
184 let desc = match &op {
185 BulkOperation::Whois { domain }
186 | BulkOperation::Rdap { domain }
187 | BulkOperation::Dns { domain, .. }
188 | BulkOperation::Propagation { domain, .. }
189 | BulkOperation::Lookup { domain }
190 | BulkOperation::Status { domain }
191 | BulkOperation::Avail { domain }
192 | BulkOperation::Info { domain }
193 | BulkOperation::Ssl { domain } => domain.as_str(),
194 };
195 progress(count, total, desc);
196 }
197
198 match result {
199 Ok(data) => BulkResult {
200 operation: op,
201 success: true,
202 data: Some(data),
203 error: None,
204 duration_ms,
205 },
206 Err(e) => {
207 debug!(error = %e, "Bulk operation failed");
208 BulkResult {
209 operation: op,
210 success: false,
211 data: None,
212 error: Some(e.to_string()),
213 duration_ms,
214 }
215 }
216 }
217 }
218 })
219 .buffer_unordered(self.concurrency)
220 .collect()
221 .await;
222
223 let succeeded = results.iter().filter(|r| r.success).count();
224 let failed = results.iter().filter(|r| !r.success).count();
225 info!(
226 total = total,
227 succeeded = succeeded,
228 failed = failed,
229 "bulk operation completed"
230 );
231
232 results
233 }
234
235 pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
236 let operations = domains
237 .into_iter()
238 .map(|domain| BulkOperation::Whois { domain })
239 .collect();
240 self.execute(operations, None).await
241 }
242
243 pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
244 let operations = domains
245 .into_iter()
246 .map(|domain| BulkOperation::Rdap { domain })
247 .collect();
248 self.execute(operations, None).await
249 }
250
251 pub async fn execute_dns(
252 &self,
253 domains: Vec<String>,
254 record_type: RecordType,
255 ) -> Vec<BulkResult> {
256 let operations = domains
257 .into_iter()
258 .map(|domain| BulkOperation::Dns {
259 domain,
260 record_type,
261 })
262 .collect();
263 self.execute(operations, None).await
264 }
265
266 pub async fn execute_propagation(
267 &self,
268 domains: Vec<String>,
269 record_type: RecordType,
270 ) -> Vec<BulkResult> {
271 let operations = domains
272 .into_iter()
273 .map(|domain| BulkOperation::Propagation {
274 domain,
275 record_type,
276 })
277 .collect();
278 self.execute(operations, None).await
279 }
280
281 pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
282 let operations = domains
283 .into_iter()
284 .map(|domain| BulkOperation::Lookup { domain })
285 .collect();
286 self.execute(operations, None).await
287 }
288
289 pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
290 let operations = domains
291 .into_iter()
292 .map(|domain| BulkOperation::Status { domain })
293 .collect();
294 self.execute(operations, None).await
295 }
296
297 pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
298 let operations = domains
299 .into_iter()
300 .map(|domain| BulkOperation::Avail { domain })
301 .collect();
302 self.execute(operations, None).await
303 }
304
305 pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
306 let operations = domains
307 .into_iter()
308 .map(|domain| BulkOperation::Info { domain })
309 .collect();
310 self.execute(operations, None).await
311 }
312
313 pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
314 let operations = domains
315 .into_iter()
316 .map(|domain| BulkOperation::Ssl { domain })
317 .collect();
318 self.execute(operations, None).await
319 }
320}
321
322struct Clients<'a> {
323 whois: &'a WhoisClient,
324 rdap: &'a RdapClient,
325 dns: &'a DnsResolver,
326 propagation: &'a PropagationChecker,
327 lookup: &'a SmartLookup,
328 status: &'a StatusClient,
329 avail: &'a AvailabilityChecker,
330 ssl: &'a SslChecker,
331}
332
333async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
334 match op {
335 BulkOperation::Whois { domain } => {
336 let result = clients.whois.lookup(domain).await?;
337 Ok(BulkResultData::Whois(result))
338 }
339 BulkOperation::Rdap { domain } => {
340 let result = clients.rdap.lookup_domain(domain).await?;
341 Ok(BulkResultData::Rdap(Box::new(result)))
342 }
343 BulkOperation::Dns {
344 domain,
345 record_type,
346 } => {
347 let result = clients.dns.resolve(domain, *record_type, None).await?;
348 Ok(BulkResultData::Dns(result))
349 }
350 BulkOperation::Propagation {
351 domain,
352 record_type,
353 } => {
354 let result = clients.propagation.check(domain, *record_type).await?;
355 Ok(BulkResultData::Propagation(result))
356 }
357 BulkOperation::Lookup { domain } => {
358 let result = clients.lookup.lookup(domain).await?;
359 Ok(BulkResultData::Lookup(result))
360 }
361 BulkOperation::Status { domain } => {
362 let result = clients.status.check(domain).await?;
363 Ok(BulkResultData::Status(result))
364 }
365 BulkOperation::Avail { domain } => {
366 let result = clients.avail.check(domain).await?;
367 Ok(BulkResultData::Avail(result))
368 }
369 BulkOperation::Info { domain } => {
370 let result = clients.lookup.lookup(domain).await?;
371 Ok(BulkResultData::Info(
372 crate::domain_info::DomainInfo::from_lookup_result(&result),
373 ))
374 }
375 BulkOperation::Ssl { domain } => {
376 let result = clients.ssl.check(domain).await?;
377 Ok(BulkResultData::Ssl(result))
378 }
379 }
380}
381
382const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
384
385fn is_csv_header_row(first: &str) -> bool {
392 let first_col = first.split(',').next().unwrap_or(first).trim();
393 if first_col.contains('.') {
395 return false;
396 }
397 let label = first_col.to_lowercase();
398 HEADER_KEYWORDS.contains(&label.as_str())
399}
400
401pub fn parse_domains_from_file(content: &str) -> Vec<String> {
402 let mut domains: Vec<String> = content
403 .lines()
404 .map(|line| line.trim())
405 .filter(|line| !line.is_empty() && !line.starts_with('#'))
406 .map(|line| {
407 line.split(',').next().unwrap_or(line).trim().to_string()
409 })
410 .filter(|domain| domain.contains('.'))
411 .collect();
412
413 if domains
421 .first()
422 .map(|d| is_csv_header_row(d))
423 .unwrap_or(false)
424 {
425 domains.remove(0);
426 }
427
428 domains
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[test]
436 fn test_parse_domains_from_file() {
437 let content = r#"
438# This is a comment
439example1.com
440google2.com
441 whitespace3.com
442invalid
443csv,format,example.org
444"#;
445
446 let domains = parse_domains_from_file(content);
447 assert_eq!(domains.len(), 3);
448 assert!(domains.contains(&"example1.com".to_string()));
449 assert!(domains.contains(&"google2.com".to_string()));
450 assert!(domains.contains(&"whitespace3.com".to_string()));
451 }
453
454 #[test]
455 fn test_parse_domains_skip_bare_header() {
456 let content = "domain\nexample.com\n";
458 let domains = parse_domains_from_file(content);
459 assert_eq!(domains, vec!["example.com"]);
460 }
461
462 #[test]
463 fn test_parse_domains_multi_column_csv_header_dropped() {
464 let content = "domain,status\ngoogle.com,live\n";
467 let domains = parse_domains_from_file(content);
468 assert_eq!(domains, vec!["google.com"]);
469 assert!(!domains.contains(&"domain".to_string()));
470 }
471
472 #[test]
473 fn first_domain_with_no_digits_is_kept() {
474 let input = "google.com\namazon.com\n";
477 let result = parse_domains_from_file(input);
478 assert_eq!(result, vec!["google.com", "amazon.com"]);
479 }
480
481 #[test]
482 fn header_row_named_hostname_is_dropped() {
483 let input = "hostname\nexample.com\n";
484 let result = parse_domains_from_file(input);
485 assert_eq!(result, vec!["example.com"]);
486 }
487
488 #[test]
489 fn domain_dot_com_is_not_dropped_as_header() {
490 let input = "domain.com\nexample.com\n";
492 let result = parse_domains_from_file(input);
493 assert_eq!(result, vec!["domain.com", "example.com"]);
494 }
495
496 #[test]
497 fn is_csv_header_row_detects_bare_keywords() {
498 assert!(is_csv_header_row("domain"));
499 assert!(is_csv_header_row("Hostname"));
500 assert!(is_csv_header_row("URL"));
501 assert!(is_csv_header_row("domain,status,notes"));
502 assert!(is_csv_header_row(" host "));
503 }
504
505 #[test]
506 fn is_csv_header_row_rejects_dotted_values() {
507 assert!(!is_csv_header_row("domain.com"));
508 assert!(!is_csv_header_row("google.com"));
509 assert!(!is_csv_header_row("host.name"));
510 }
511
512 #[test]
513 fn is_csv_header_row_rejects_non_keyword() {
514 assert!(!is_csv_header_row("example"));
515 assert!(!is_csv_header_row("mydata"));
516 }
517
518 #[tokio::test]
519 async fn execute_ssl_failure_path_for_unresolvable_host() {
520 let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
524 let results = executor
525 .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
526 .await;
527 assert_eq!(results.len(), 1);
528 let r = &results[0];
529 assert!(!r.success, "expected failure, got success");
530 assert!(r.data.is_none(), "expected no data on failure");
531 let err = r.error.as_deref().unwrap_or("");
532 assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
533 assert!(
534 matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
535 "expected Ssl variant, got {:?}",
536 r.operation
537 );
538 }
539
540 #[tokio::test]
541 #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
542 async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
543 let executor = BulkExecutor::new();
544 let results = executor
545 .execute_ssl(vec!["cloudflare.com".to_string()])
546 .await;
547 assert_eq!(results.len(), 1);
548 let r = &results[0];
549 assert!(r.success, "expected success, got error: {:?}", r.error);
550 let Some(BulkResultData::Ssl(ref report)) = r.data else {
551 panic!("expected Ssl data, got {:?}", r.data);
552 };
553 assert!(!report.chain.is_empty(), "chain must not be empty");
554 assert!(report.is_valid, "cloudflare leaf cert should be valid");
555 assert!(
556 report.days_until_expiry > 0,
557 "cert should still be valid in the future, got {} days",
558 report.days_until_expiry
559 );
560 assert!(
561 !report.san_names.is_empty(),
562 "cloudflare cert should have SAN entries"
563 );
564 }
565}