1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Mutex;
8use tokio::time::{sleep_until, Instant as TokioInstant};
9use tracing::{debug, info, instrument};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::ssl::{SslChecker, SslReport};
17use crate::status::{StatusClient, StatusResponse};
18use crate::whois::{WhoisClient, WhoisResponse};
19
20pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum BulkOperation {
26 Whois {
27 domain: String,
28 },
29 Rdap {
30 domain: String,
31 },
32 Dns {
33 domain: String,
34 record_type: RecordType,
35 },
36 Propagation {
37 domain: String,
38 record_type: RecordType,
39 },
40 Lookup {
41 domain: String,
42 },
43 Status {
44 domain: String,
45 },
46 Avail {
47 domain: String,
48 },
49 Info {
50 domain: String,
51 },
52 Ssl {
53 domain: String,
54 },
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
60pub enum BulkResultData {
61 Whois(WhoisResponse),
62 Rdap(Box<RdapResponse>),
63 Dns(Vec<DnsRecord>),
64 Propagation(PropagationResult),
65 Lookup(LookupResult),
66 Status(StatusResponse),
67 Avail(AvailabilityResult),
68 Info(crate::domain_info::DomainInfo),
69 Ssl(SslReport),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BulkResult {
75 pub operation: BulkOperation,
76 pub success: bool,
77 pub data: Option<BulkResultData>,
78 pub error: Option<String>,
79 pub duration_ms: u64,
80}
81
82#[derive(Debug, Clone)]
84pub struct BulkExecutor {
85 concurrency: usize,
86 rate_limit_delay: Duration,
87 whois_client: WhoisClient,
88 rdap_client: RdapClient,
89 dns_resolver: DnsResolver,
90 propagation_checker: PropagationChecker,
91 smart_lookup: SmartLookup,
92 status_client: StatusClient,
93 availability_checker: AvailabilityChecker,
94 ssl_checker: SslChecker,
95}
96
97impl Default for BulkExecutor {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl BulkExecutor {
104 pub fn new() -> Self {
105 Self {
106 concurrency: 10,
107 rate_limit_delay: Duration::from_millis(100),
108 whois_client: WhoisClient::new(),
109 rdap_client: RdapClient::new(),
110 dns_resolver: DnsResolver::new(),
111 propagation_checker: PropagationChecker::new(),
112 smart_lookup: SmartLookup::new(),
113 status_client: StatusClient::new(),
114 availability_checker: AvailabilityChecker::new(),
115 ssl_checker: SslChecker::new(),
116 }
117 }
118
119 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
120 self.concurrency = concurrency.clamp(1, 50);
121 self
122 }
123
124 pub fn with_rate_limit(mut self, delay: Duration) -> Self {
125 self.rate_limit_delay = delay;
126 self
127 }
128
129 #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
130 pub async fn execute(
131 &self,
132 operations: Vec<BulkOperation>,
133 progress: Option<ProgressCallback>,
134 ) -> Vec<BulkResult> {
135 let total = operations.len();
136 let completed = Arc::new(AtomicUsize::new(0));
137
138 info!("bulk operation started");
139 debug!(
140 total = total,
141 concurrency = self.concurrency,
142 "Starting bulk execution"
143 );
144
145 let limiter = if self.rate_limit_delay.is_zero() {
158 None
159 } else {
160 Some(Arc::new(Mutex::new(None::<TokioInstant>)))
161 };
162 let rate_limit_delay = self.rate_limit_delay;
163
164 let results: Vec<BulkResult> = stream::iter(operations)
165 .map(|op| {
166 let completed = completed.clone();
167 let progress = progress.as_ref();
168 let limiter = limiter.clone();
169 let whois_client = &self.whois_client;
170 let rdap_client = &self.rdap_client;
171 let dns_resolver = &self.dns_resolver;
172 let propagation_checker = &self.propagation_checker;
173 let smart_lookup = &self.smart_lookup;
174 let status_client = &self.status_client;
175 let availability_checker = &self.availability_checker;
176 let ssl_checker = &self.ssl_checker;
177
178 async move {
179 if let Some(limiter) = &limiter {
183 let my_slot = {
184 let mut next = limiter.lock().await;
185 let now = TokioInstant::now();
186 let slot = match *next {
187 Some(prev) if prev > now => prev,
188 _ => now,
189 };
190 *next = Some(slot + rate_limit_delay);
191 slot
192 };
193 sleep_until(my_slot).await;
194 }
195
196 let start = std::time::Instant::now();
197 let result = execute_operation(
198 &op,
199 &Clients {
200 whois: whois_client,
201 rdap: rdap_client,
202 dns: dns_resolver,
203 propagation: propagation_checker,
204 lookup: smart_lookup,
205 status: status_client,
206 avail: availability_checker,
207 ssl: ssl_checker,
208 },
209 )
210 .await;
211 let duration_ms = start.elapsed().as_millis() as u64;
212
213 let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
214
215 if let Some(progress) = progress {
216 let desc = match &op {
217 BulkOperation::Whois { domain }
218 | BulkOperation::Rdap { domain }
219 | BulkOperation::Dns { domain, .. }
220 | BulkOperation::Propagation { domain, .. }
221 | BulkOperation::Lookup { domain }
222 | BulkOperation::Status { domain }
223 | BulkOperation::Avail { domain }
224 | BulkOperation::Info { domain }
225 | BulkOperation::Ssl { domain } => domain.as_str(),
226 };
227 progress(count, total, desc);
228 }
229
230 match result {
231 Ok(data) => BulkResult {
232 operation: op,
233 success: true,
234 data: Some(data),
235 error: None,
236 duration_ms,
237 },
238 Err(e) => {
239 debug!(error = %e, "Bulk operation failed");
240 BulkResult {
241 operation: op,
242 success: false,
243 data: None,
244 error: Some(e.to_string()),
245 duration_ms,
246 }
247 }
248 }
249 }
250 })
251 .buffer_unordered(self.concurrency)
252 .collect()
253 .await;
254
255 let succeeded = results.iter().filter(|r| r.success).count();
256 let failed = results.iter().filter(|r| !r.success).count();
257 info!(
258 total = total,
259 succeeded = succeeded,
260 failed = failed,
261 "bulk operation completed"
262 );
263
264 results
265 }
266
267 pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
268 let operations = domains
269 .into_iter()
270 .map(|domain| BulkOperation::Whois { domain })
271 .collect();
272 self.execute(operations, None).await
273 }
274
275 pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
276 let operations = domains
277 .into_iter()
278 .map(|domain| BulkOperation::Rdap { domain })
279 .collect();
280 self.execute(operations, None).await
281 }
282
283 pub async fn execute_dns(
284 &self,
285 domains: Vec<String>,
286 record_type: RecordType,
287 ) -> Vec<BulkResult> {
288 let operations = domains
289 .into_iter()
290 .map(|domain| BulkOperation::Dns {
291 domain,
292 record_type,
293 })
294 .collect();
295 self.execute(operations, None).await
296 }
297
298 pub async fn execute_propagation(
299 &self,
300 domains: Vec<String>,
301 record_type: RecordType,
302 ) -> Vec<BulkResult> {
303 let operations = domains
304 .into_iter()
305 .map(|domain| BulkOperation::Propagation {
306 domain,
307 record_type,
308 })
309 .collect();
310 self.execute(operations, None).await
311 }
312
313 pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
314 let operations = domains
315 .into_iter()
316 .map(|domain| BulkOperation::Lookup { domain })
317 .collect();
318 self.execute(operations, None).await
319 }
320
321 pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
322 let operations = domains
323 .into_iter()
324 .map(|domain| BulkOperation::Status { domain })
325 .collect();
326 self.execute(operations, None).await
327 }
328
329 pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
330 let operations = domains
331 .into_iter()
332 .map(|domain| BulkOperation::Avail { domain })
333 .collect();
334 self.execute(operations, None).await
335 }
336
337 pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
338 let operations = domains
339 .into_iter()
340 .map(|domain| BulkOperation::Info { domain })
341 .collect();
342 self.execute(operations, None).await
343 }
344
345 pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
346 let operations = domains
347 .into_iter()
348 .map(|domain| BulkOperation::Ssl { domain })
349 .collect();
350 self.execute(operations, None).await
351 }
352}
353
354struct Clients<'a> {
355 whois: &'a WhoisClient,
356 rdap: &'a RdapClient,
357 dns: &'a DnsResolver,
358 propagation: &'a PropagationChecker,
359 lookup: &'a SmartLookup,
360 status: &'a StatusClient,
361 avail: &'a AvailabilityChecker,
362 ssl: &'a SslChecker,
363}
364
365async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
366 match op {
367 BulkOperation::Whois { domain } => {
368 let result = clients.whois.lookup(domain).await?;
369 Ok(BulkResultData::Whois(result))
370 }
371 BulkOperation::Rdap { domain } => {
372 let result = clients.rdap.lookup_domain(domain).await?;
373 Ok(BulkResultData::Rdap(Box::new(result)))
374 }
375 BulkOperation::Dns {
376 domain,
377 record_type,
378 } => {
379 let result = clients.dns.resolve(domain, *record_type, None).await?;
380 Ok(BulkResultData::Dns(result))
381 }
382 BulkOperation::Propagation {
383 domain,
384 record_type,
385 } => {
386 let result = clients.propagation.check(domain, *record_type).await?;
387 Ok(BulkResultData::Propagation(result))
388 }
389 BulkOperation::Lookup { domain } => {
390 let result = clients.lookup.lookup(domain).await?;
391 Ok(BulkResultData::Lookup(result))
392 }
393 BulkOperation::Status { domain } => {
394 let result = clients.status.check(domain).await?;
395 Ok(BulkResultData::Status(result))
396 }
397 BulkOperation::Avail { domain } => {
398 let result = clients.avail.check(domain).await?;
399 Ok(BulkResultData::Avail(result))
400 }
401 BulkOperation::Info { domain } => {
402 let result = clients.lookup.lookup(domain).await?;
403 Ok(BulkResultData::Info(
404 crate::domain_info::DomainInfo::from_lookup_result(&result),
405 ))
406 }
407 BulkOperation::Ssl { domain } => {
408 let result = clients.ssl.check(domain).await?;
409 Ok(BulkResultData::Ssl(result))
410 }
411 }
412}
413
414const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
416
417fn is_csv_header_row(first: &str) -> bool {
424 let first_col = first.split(',').next().unwrap_or(first).trim();
425 if first_col.contains('.') {
427 return false;
428 }
429 let label = first_col.to_lowercase();
430 HEADER_KEYWORDS.contains(&label.as_str())
431}
432
433pub fn parse_domains_from_file(content: &str) -> Vec<String> {
434 let mut domains: Vec<String> = content
435 .lines()
436 .map(|line| line.trim())
437 .filter(|line| !line.is_empty() && !line.starts_with('#'))
438 .map(|line| {
439 line.split(',').next().unwrap_or(line).trim().to_string()
441 })
442 .filter(|domain| domain.contains('.'))
443 .collect();
444
445 if domains
453 .first()
454 .map(|d| is_csv_header_row(d))
455 .unwrap_or(false)
456 {
457 domains.remove(0);
458 }
459
460 domains
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_parse_domains_from_file() {
469 let content = r#"
470# This is a comment
471example1.com
472google2.com
473 whitespace3.com
474invalid
475csv,format,example.org
476"#;
477
478 let domains = parse_domains_from_file(content);
479 assert_eq!(domains.len(), 3);
480 assert!(domains.contains(&"example1.com".to_string()));
481 assert!(domains.contains(&"google2.com".to_string()));
482 assert!(domains.contains(&"whitespace3.com".to_string()));
483 }
485
486 #[test]
487 fn test_parse_domains_skip_bare_header() {
488 let content = "domain\nexample.com\n";
490 let domains = parse_domains_from_file(content);
491 assert_eq!(domains, vec!["example.com"]);
492 }
493
494 #[test]
495 fn test_parse_domains_multi_column_csv_header_dropped() {
496 let content = "domain,status\ngoogle.com,live\n";
499 let domains = parse_domains_from_file(content);
500 assert_eq!(domains, vec!["google.com"]);
501 assert!(!domains.contains(&"domain".to_string()));
502 }
503
504 #[test]
505 fn first_domain_with_no_digits_is_kept() {
506 let input = "google.com\namazon.com\n";
509 let result = parse_domains_from_file(input);
510 assert_eq!(result, vec!["google.com", "amazon.com"]);
511 }
512
513 #[test]
514 fn header_row_named_hostname_is_dropped() {
515 let input = "hostname\nexample.com\n";
516 let result = parse_domains_from_file(input);
517 assert_eq!(result, vec!["example.com"]);
518 }
519
520 #[test]
521 fn domain_dot_com_is_not_dropped_as_header() {
522 let input = "domain.com\nexample.com\n";
524 let result = parse_domains_from_file(input);
525 assert_eq!(result, vec!["domain.com", "example.com"]);
526 }
527
528 #[test]
529 fn is_csv_header_row_detects_bare_keywords() {
530 assert!(is_csv_header_row("domain"));
531 assert!(is_csv_header_row("Hostname"));
532 assert!(is_csv_header_row("URL"));
533 assert!(is_csv_header_row("domain,status,notes"));
534 assert!(is_csv_header_row(" host "));
535 }
536
537 #[test]
538 fn is_csv_header_row_rejects_dotted_values() {
539 assert!(!is_csv_header_row("domain.com"));
540 assert!(!is_csv_header_row("google.com"));
541 assert!(!is_csv_header_row("host.name"));
542 }
543
544 #[test]
545 fn is_csv_header_row_rejects_non_keyword() {
546 assert!(!is_csv_header_row("example"));
547 assert!(!is_csv_header_row("mydata"));
548 }
549
550 #[tokio::test]
566 async fn rate_limiter_dispatches_in_parallel_not_serialized() {
567 use std::time::Instant;
568 let executor = BulkExecutor::new()
569 .with_concurrency(5)
570 .with_rate_limit(Duration::from_millis(50));
571
572 let start = Instant::now();
579 let domains = vec![
580 "seer-rl-1.invalid".to_string(),
581 "seer-rl-2.invalid".to_string(),
582 "seer-rl-3.invalid".to_string(),
583 "seer-rl-4.invalid".to_string(),
584 ];
585 let results = executor.execute_ssl(domains).await;
586 let elapsed = start.elapsed();
587
588 assert_eq!(results.len(), 4);
589 assert!(
593 elapsed < Duration::from_secs(2),
594 "rate-limited dispatch should run in parallel; took {:?}",
595 elapsed
596 );
597 }
598
599 #[tokio::test]
600 async fn execute_ssl_failure_path_for_unresolvable_host() {
601 let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
605 let results = executor
606 .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
607 .await;
608 assert_eq!(results.len(), 1);
609 let r = &results[0];
610 assert!(!r.success, "expected failure, got success");
611 assert!(r.data.is_none(), "expected no data on failure");
612 let err = r.error.as_deref().unwrap_or("");
613 assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
614 assert!(
615 matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
616 "expected Ssl variant, got {:?}",
617 r.operation
618 );
619 }
620
621 #[tokio::test]
622 #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
623 async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
624 let executor = BulkExecutor::new();
625 let results = executor
626 .execute_ssl(vec!["cloudflare.com".to_string()])
627 .await;
628 assert_eq!(results.len(), 1);
629 let r = &results[0];
630 assert!(r.success, "expected success, got error: {:?}", r.error);
631 let Some(BulkResultData::Ssl(ref report)) = r.data else {
632 panic!("expected Ssl data, got {:?}", r.data);
633 };
634 assert!(!report.chain.is_empty(), "chain must not be empty");
635 assert!(report.is_valid, "cloudflare leaf cert should be valid");
636 assert!(
637 report.days_until_expiry > 0,
638 "cert should still be valid in the future, got {} days",
639 report.days_until_expiry
640 );
641 assert!(
642 !report.san_names.is_empty(),
643 "cloudflare cert should have SAN entries"
644 );
645 }
646}