1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Mutex;
8use tokio::time::{interval, MissedTickBehavior};
9use tracing::{debug, info, instrument};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::ssl::{SslChecker, SslReport};
17use crate::status::{StatusClient, StatusResponse};
18use crate::whois::{WhoisClient, WhoisResponse};
19
20pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum BulkOperation {
26 Whois {
27 domain: String,
28 },
29 Rdap {
30 domain: String,
31 },
32 Dns {
33 domain: String,
34 record_type: RecordType,
35 },
36 Propagation {
37 domain: String,
38 record_type: RecordType,
39 },
40 Lookup {
41 domain: String,
42 },
43 Status {
44 domain: String,
45 },
46 Avail {
47 domain: String,
48 },
49 Info {
50 domain: String,
51 },
52 Ssl {
53 domain: String,
54 },
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
60pub enum BulkResultData {
61 Whois(WhoisResponse),
62 Rdap(Box<RdapResponse>),
63 Dns(Vec<DnsRecord>),
64 Propagation(PropagationResult),
65 Lookup(LookupResult),
66 Status(StatusResponse),
67 Avail(AvailabilityResult),
68 Info(crate::domain_info::DomainInfo),
69 Ssl(SslReport),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BulkResult {
75 pub operation: BulkOperation,
76 pub success: bool,
77 pub data: Option<BulkResultData>,
78 pub error: Option<String>,
79 pub duration_ms: u64,
80}
81
82#[derive(Debug, Clone)]
84pub struct BulkExecutor {
85 concurrency: usize,
86 rate_limit_delay: Duration,
87 whois_client: WhoisClient,
88 rdap_client: RdapClient,
89 dns_resolver: DnsResolver,
90 propagation_checker: PropagationChecker,
91 smart_lookup: SmartLookup,
92 status_client: StatusClient,
93 availability_checker: AvailabilityChecker,
94 ssl_checker: SslChecker,
95}
96
97impl Default for BulkExecutor {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl BulkExecutor {
104 pub fn new() -> Self {
105 Self {
106 concurrency: 10,
107 rate_limit_delay: Duration::from_millis(100),
108 whois_client: WhoisClient::new(),
109 rdap_client: RdapClient::new(),
110 dns_resolver: DnsResolver::new(),
111 propagation_checker: PropagationChecker::new(),
112 smart_lookup: SmartLookup::new(),
113 status_client: StatusClient::new(),
114 availability_checker: AvailabilityChecker::new(),
115 ssl_checker: SslChecker::new(),
116 }
117 }
118
119 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
120 self.concurrency = concurrency.clamp(1, 50);
121 self
122 }
123
124 pub fn with_rate_limit(mut self, delay: Duration) -> Self {
125 self.rate_limit_delay = delay;
126 self
127 }
128
129 #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
130 pub async fn execute(
131 &self,
132 operations: Vec<BulkOperation>,
133 progress: Option<ProgressCallback>,
134 ) -> Vec<BulkResult> {
135 let total = operations.len();
136 let completed = Arc::new(AtomicUsize::new(0));
137
138 info!("bulk operation started");
139 debug!(
140 total = total,
141 concurrency = self.concurrency,
142 "Starting bulk execution"
143 );
144
145 let limiter = if self.rate_limit_delay.is_zero() {
153 None
154 } else {
155 let mut iv = interval(self.rate_limit_delay);
156 iv.set_missed_tick_behavior(MissedTickBehavior::Delay);
159 Some(Arc::new(Mutex::new(iv)))
160 };
161
162 let results: Vec<BulkResult> = stream::iter(operations)
163 .map(|op| {
164 let completed = completed.clone();
165 let progress = progress.as_ref();
166 let limiter = limiter.clone();
167 let whois_client = &self.whois_client;
168 let rdap_client = &self.rdap_client;
169 let dns_resolver = &self.dns_resolver;
170 let propagation_checker = &self.propagation_checker;
171 let smart_lookup = &self.smart_lookup;
172 let status_client = &self.status_client;
173 let availability_checker = &self.availability_checker;
174 let ssl_checker = &self.ssl_checker;
175
176 async move {
177 if let Some(limiter) = &limiter {
182 limiter.lock().await.tick().await;
183 }
184
185 let start = std::time::Instant::now();
186 let result = execute_operation(
187 &op,
188 &Clients {
189 whois: whois_client,
190 rdap: rdap_client,
191 dns: dns_resolver,
192 propagation: propagation_checker,
193 lookup: smart_lookup,
194 status: status_client,
195 avail: availability_checker,
196 ssl: ssl_checker,
197 },
198 )
199 .await;
200 let duration_ms = start.elapsed().as_millis() as u64;
201
202 let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
203
204 if let Some(progress) = progress {
205 let desc = match &op {
206 BulkOperation::Whois { domain }
207 | BulkOperation::Rdap { domain }
208 | BulkOperation::Dns { domain, .. }
209 | BulkOperation::Propagation { domain, .. }
210 | BulkOperation::Lookup { domain }
211 | BulkOperation::Status { domain }
212 | BulkOperation::Avail { domain }
213 | BulkOperation::Info { domain }
214 | BulkOperation::Ssl { domain } => domain.as_str(),
215 };
216 progress(count, total, desc);
217 }
218
219 match result {
220 Ok(data) => BulkResult {
221 operation: op,
222 success: true,
223 data: Some(data),
224 error: None,
225 duration_ms,
226 },
227 Err(e) => {
228 debug!(error = %e, "Bulk operation failed");
229 BulkResult {
230 operation: op,
231 success: false,
232 data: None,
233 error: Some(e.to_string()),
234 duration_ms,
235 }
236 }
237 }
238 }
239 })
240 .buffer_unordered(self.concurrency)
241 .collect()
242 .await;
243
244 let succeeded = results.iter().filter(|r| r.success).count();
245 let failed = results.iter().filter(|r| !r.success).count();
246 info!(
247 total = total,
248 succeeded = succeeded,
249 failed = failed,
250 "bulk operation completed"
251 );
252
253 results
254 }
255
256 pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
257 let operations = domains
258 .into_iter()
259 .map(|domain| BulkOperation::Whois { domain })
260 .collect();
261 self.execute(operations, None).await
262 }
263
264 pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
265 let operations = domains
266 .into_iter()
267 .map(|domain| BulkOperation::Rdap { domain })
268 .collect();
269 self.execute(operations, None).await
270 }
271
272 pub async fn execute_dns(
273 &self,
274 domains: Vec<String>,
275 record_type: RecordType,
276 ) -> Vec<BulkResult> {
277 let operations = domains
278 .into_iter()
279 .map(|domain| BulkOperation::Dns {
280 domain,
281 record_type,
282 })
283 .collect();
284 self.execute(operations, None).await
285 }
286
287 pub async fn execute_propagation(
288 &self,
289 domains: Vec<String>,
290 record_type: RecordType,
291 ) -> Vec<BulkResult> {
292 let operations = domains
293 .into_iter()
294 .map(|domain| BulkOperation::Propagation {
295 domain,
296 record_type,
297 })
298 .collect();
299 self.execute(operations, None).await
300 }
301
302 pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
303 let operations = domains
304 .into_iter()
305 .map(|domain| BulkOperation::Lookup { domain })
306 .collect();
307 self.execute(operations, None).await
308 }
309
310 pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
311 let operations = domains
312 .into_iter()
313 .map(|domain| BulkOperation::Status { domain })
314 .collect();
315 self.execute(operations, None).await
316 }
317
318 pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
319 let operations = domains
320 .into_iter()
321 .map(|domain| BulkOperation::Avail { domain })
322 .collect();
323 self.execute(operations, None).await
324 }
325
326 pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
327 let operations = domains
328 .into_iter()
329 .map(|domain| BulkOperation::Info { domain })
330 .collect();
331 self.execute(operations, None).await
332 }
333
334 pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
335 let operations = domains
336 .into_iter()
337 .map(|domain| BulkOperation::Ssl { domain })
338 .collect();
339 self.execute(operations, None).await
340 }
341}
342
343struct Clients<'a> {
344 whois: &'a WhoisClient,
345 rdap: &'a RdapClient,
346 dns: &'a DnsResolver,
347 propagation: &'a PropagationChecker,
348 lookup: &'a SmartLookup,
349 status: &'a StatusClient,
350 avail: &'a AvailabilityChecker,
351 ssl: &'a SslChecker,
352}
353
354async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
355 match op {
356 BulkOperation::Whois { domain } => {
357 let result = clients.whois.lookup(domain).await?;
358 Ok(BulkResultData::Whois(result))
359 }
360 BulkOperation::Rdap { domain } => {
361 let result = clients.rdap.lookup_domain(domain).await?;
362 Ok(BulkResultData::Rdap(Box::new(result)))
363 }
364 BulkOperation::Dns {
365 domain,
366 record_type,
367 } => {
368 let result = clients.dns.resolve(domain, *record_type, None).await?;
369 Ok(BulkResultData::Dns(result))
370 }
371 BulkOperation::Propagation {
372 domain,
373 record_type,
374 } => {
375 let result = clients.propagation.check(domain, *record_type).await?;
376 Ok(BulkResultData::Propagation(result))
377 }
378 BulkOperation::Lookup { domain } => {
379 let result = clients.lookup.lookup(domain).await?;
380 Ok(BulkResultData::Lookup(result))
381 }
382 BulkOperation::Status { domain } => {
383 let result = clients.status.check(domain).await?;
384 Ok(BulkResultData::Status(result))
385 }
386 BulkOperation::Avail { domain } => {
387 let result = clients.avail.check(domain).await?;
388 Ok(BulkResultData::Avail(result))
389 }
390 BulkOperation::Info { domain } => {
391 let result = clients.lookup.lookup(domain).await?;
392 Ok(BulkResultData::Info(
393 crate::domain_info::DomainInfo::from_lookup_result(&result),
394 ))
395 }
396 BulkOperation::Ssl { domain } => {
397 let result = clients.ssl.check(domain).await?;
398 Ok(BulkResultData::Ssl(result))
399 }
400 }
401}
402
403const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
405
406fn is_csv_header_row(first: &str) -> bool {
413 let first_col = first.split(',').next().unwrap_or(first).trim();
414 if first_col.contains('.') {
416 return false;
417 }
418 let label = first_col.to_lowercase();
419 HEADER_KEYWORDS.contains(&label.as_str())
420}
421
422pub fn parse_domains_from_file(content: &str) -> Vec<String> {
423 let mut domains: Vec<String> = content
424 .lines()
425 .map(|line| line.trim())
426 .filter(|line| !line.is_empty() && !line.starts_with('#'))
427 .map(|line| {
428 line.split(',').next().unwrap_or(line).trim().to_string()
430 })
431 .filter(|domain| domain.contains('.'))
432 .collect();
433
434 if domains
442 .first()
443 .map(|d| is_csv_header_row(d))
444 .unwrap_or(false)
445 {
446 domains.remove(0);
447 }
448
449 domains
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn test_parse_domains_from_file() {
458 let content = r#"
459# This is a comment
460example1.com
461google2.com
462 whitespace3.com
463invalid
464csv,format,example.org
465"#;
466
467 let domains = parse_domains_from_file(content);
468 assert_eq!(domains.len(), 3);
469 assert!(domains.contains(&"example1.com".to_string()));
470 assert!(domains.contains(&"google2.com".to_string()));
471 assert!(domains.contains(&"whitespace3.com".to_string()));
472 }
474
475 #[test]
476 fn test_parse_domains_skip_bare_header() {
477 let content = "domain\nexample.com\n";
479 let domains = parse_domains_from_file(content);
480 assert_eq!(domains, vec!["example.com"]);
481 }
482
483 #[test]
484 fn test_parse_domains_multi_column_csv_header_dropped() {
485 let content = "domain,status\ngoogle.com,live\n";
488 let domains = parse_domains_from_file(content);
489 assert_eq!(domains, vec!["google.com"]);
490 assert!(!domains.contains(&"domain".to_string()));
491 }
492
493 #[test]
494 fn first_domain_with_no_digits_is_kept() {
495 let input = "google.com\namazon.com\n";
498 let result = parse_domains_from_file(input);
499 assert_eq!(result, vec!["google.com", "amazon.com"]);
500 }
501
502 #[test]
503 fn header_row_named_hostname_is_dropped() {
504 let input = "hostname\nexample.com\n";
505 let result = parse_domains_from_file(input);
506 assert_eq!(result, vec!["example.com"]);
507 }
508
509 #[test]
510 fn domain_dot_com_is_not_dropped_as_header() {
511 let input = "domain.com\nexample.com\n";
513 let result = parse_domains_from_file(input);
514 assert_eq!(result, vec!["domain.com", "example.com"]);
515 }
516
517 #[test]
518 fn is_csv_header_row_detects_bare_keywords() {
519 assert!(is_csv_header_row("domain"));
520 assert!(is_csv_header_row("Hostname"));
521 assert!(is_csv_header_row("URL"));
522 assert!(is_csv_header_row("domain,status,notes"));
523 assert!(is_csv_header_row(" host "));
524 }
525
526 #[test]
527 fn is_csv_header_row_rejects_dotted_values() {
528 assert!(!is_csv_header_row("domain.com"));
529 assert!(!is_csv_header_row("google.com"));
530 assert!(!is_csv_header_row("host.name"));
531 }
532
533 #[test]
534 fn is_csv_header_row_rejects_non_keyword() {
535 assert!(!is_csv_header_row("example"));
536 assert!(!is_csv_header_row("mydata"));
537 }
538
539 #[tokio::test]
540 async fn execute_ssl_failure_path_for_unresolvable_host() {
541 let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
545 let results = executor
546 .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
547 .await;
548 assert_eq!(results.len(), 1);
549 let r = &results[0];
550 assert!(!r.success, "expected failure, got success");
551 assert!(r.data.is_none(), "expected no data on failure");
552 let err = r.error.as_deref().unwrap_or("");
553 assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
554 assert!(
555 matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
556 "expected Ssl variant, got {:?}",
557 r.operation
558 );
559 }
560
561 #[tokio::test]
562 #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
563 async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
564 let executor = BulkExecutor::new();
565 let results = executor
566 .execute_ssl(vec!["cloudflare.com".to_string()])
567 .await;
568 assert_eq!(results.len(), 1);
569 let r = &results[0];
570 assert!(r.success, "expected success, got error: {:?}", r.error);
571 let Some(BulkResultData::Ssl(ref report)) = r.data else {
572 panic!("expected Ssl data, got {:?}", r.data);
573 };
574 assert!(!report.chain.is_empty(), "chain must not be empty");
575 assert!(report.is_valid, "cloudflare leaf cert should be valid");
576 assert!(
577 report.days_until_expiry > 0,
578 "cert should still be valid in the future, got {} days",
579 report.days_until_expiry
580 );
581 assert!(
582 !report.san_names.is_empty(),
583 "cloudflare cert should have SAN entries"
584 );
585 }
586}