1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Mutex;
8use tokio::time::{sleep_until, Instant as TokioInstant};
9use tracing::{debug, info, instrument};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::ssl::{SslChecker, SslReport};
17use crate::status::{StatusClient, StatusResponse};
18use crate::whois::{WhoisClient, WhoisResponse};
19
20pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum BulkOperation {
26 Whois {
27 domain: String,
28 },
29 Rdap {
30 domain: String,
31 },
32 Dns {
33 domain: String,
34 record_type: RecordType,
35 },
36 Propagation {
37 domain: String,
38 record_type: RecordType,
39 },
40 Lookup {
41 domain: String,
42 },
43 Status {
44 domain: String,
45 },
46 Avail {
47 domain: String,
48 },
49 Info {
50 domain: String,
51 },
52 Ssl {
53 domain: String,
54 },
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
60pub enum BulkResultData {
61 Whois(WhoisResponse),
62 Rdap(Box<RdapResponse>),
63 Dns(Vec<DnsRecord>),
64 Propagation(PropagationResult),
65 Lookup(LookupResult),
66 Status(StatusResponse),
67 Avail(AvailabilityResult),
68 Info(crate::domain_info::DomainInfo),
69 Ssl(SslReport),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BulkResult {
75 pub operation: BulkOperation,
76 pub success: bool,
77 pub data: Option<BulkResultData>,
78 pub error: Option<String>,
79 pub duration_ms: u64,
80}
81
82#[derive(Debug, Clone)]
84pub struct BulkExecutor {
85 concurrency: usize,
86 rate_limit_delay: Duration,
87 whois_client: WhoisClient,
88 rdap_client: RdapClient,
89 dns_resolver: DnsResolver,
90 propagation_checker: PropagationChecker,
91 smart_lookup: SmartLookup,
92 status_client: StatusClient,
93 availability_checker: AvailabilityChecker,
94 ssl_checker: SslChecker,
95}
96
97impl Default for BulkExecutor {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl BulkExecutor {
104 pub fn new() -> Self {
105 Self {
106 concurrency: 10,
107 rate_limit_delay: Duration::from_millis(100),
108 whois_client: WhoisClient::new(),
109 rdap_client: RdapClient::new(),
110 dns_resolver: DnsResolver::new(),
111 propagation_checker: PropagationChecker::new(),
112 smart_lookup: SmartLookup::new(),
113 status_client: StatusClient::new(),
114 availability_checker: AvailabilityChecker::new(),
115 ssl_checker: SslChecker::new(),
116 }
117 }
118
119 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
120 self.concurrency = concurrency.clamp(1, 50);
121 self
122 }
123
124 pub fn with_rate_limit(mut self, delay: Duration) -> Self {
125 self.rate_limit_delay = delay;
126 self
127 }
128
129 #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
130 pub async fn execute(
131 &self,
132 operations: Vec<BulkOperation>,
133 progress: Option<ProgressCallback>,
134 ) -> Vec<BulkResult> {
135 let total = operations.len();
136 let completed = Arc::new(AtomicUsize::new(0));
137
138 info!("bulk operation started");
139 debug!(
140 total = total,
141 concurrency = self.concurrency,
142 "Starting bulk execution"
143 );
144
145 let limiter = if self.rate_limit_delay.is_zero() {
158 None
159 } else {
160 Some(Arc::new(Mutex::new(None::<TokioInstant>)))
161 };
162 let rate_limit_delay = self.rate_limit_delay;
163
164 let results: Vec<BulkResult> = stream::iter(operations)
165 .map(|op| {
166 let completed = completed.clone();
167 let progress = progress.as_ref();
168 let limiter = limiter.clone();
169 let whois_client = &self.whois_client;
170 let rdap_client = &self.rdap_client;
171 let dns_resolver = &self.dns_resolver;
172 let propagation_checker = &self.propagation_checker;
173 let smart_lookup = &self.smart_lookup;
174 let status_client = &self.status_client;
175 let availability_checker = &self.availability_checker;
176 let ssl_checker = &self.ssl_checker;
177
178 async move {
179 if let Some(limiter) = &limiter {
183 let my_slot = {
184 let mut next = limiter.lock().await;
185 let now = TokioInstant::now();
186 let slot = match *next {
187 Some(prev) if prev > now => prev,
188 _ => now,
189 };
190 *next = Some(slot + rate_limit_delay);
191 slot
192 };
193 sleep_until(my_slot).await;
194 }
195
196 let start = std::time::Instant::now();
197 let result = execute_operation(
198 &op,
199 &Clients {
200 whois: whois_client,
201 rdap: rdap_client,
202 dns: dns_resolver,
203 propagation: propagation_checker,
204 lookup: smart_lookup,
205 status: status_client,
206 avail: availability_checker,
207 ssl: ssl_checker,
208 },
209 )
210 .await;
211 let duration_ms = start.elapsed().as_millis() as u64;
212
213 let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
214
215 if let Some(progress) = progress {
216 let desc = match &op {
217 BulkOperation::Whois { domain }
218 | BulkOperation::Rdap { domain }
219 | BulkOperation::Dns { domain, .. }
220 | BulkOperation::Propagation { domain, .. }
221 | BulkOperation::Lookup { domain }
222 | BulkOperation::Status { domain }
223 | BulkOperation::Avail { domain }
224 | BulkOperation::Info { domain }
225 | BulkOperation::Ssl { domain } => domain.as_str(),
226 };
227 progress(count, total, desc);
228 }
229
230 match result {
231 Ok(data) => BulkResult {
232 operation: op,
233 success: true,
234 data: Some(data),
235 error: None,
236 duration_ms,
237 },
238 Err(e) => {
239 debug!(error = %e, "Bulk operation failed");
240 BulkResult {
241 operation: op,
242 success: false,
243 data: None,
244 error: Some(e.sanitized_message()),
247 duration_ms,
248 }
249 }
250 }
251 }
252 })
253 .buffer_unordered(self.concurrency)
254 .collect()
255 .await;
256
257 let succeeded = results.iter().filter(|r| r.success).count();
258 let failed = results.iter().filter(|r| !r.success).count();
259 info!(
260 total = total,
261 succeeded = succeeded,
262 failed = failed,
263 "bulk operation completed"
264 );
265
266 results
267 }
268
269 pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
270 let operations = domains
271 .into_iter()
272 .map(|domain| BulkOperation::Whois { domain })
273 .collect();
274 self.execute(operations, None).await
275 }
276
277 pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
278 let operations = domains
279 .into_iter()
280 .map(|domain| BulkOperation::Rdap { domain })
281 .collect();
282 self.execute(operations, None).await
283 }
284
285 pub async fn execute_dns(
286 &self,
287 domains: Vec<String>,
288 record_type: RecordType,
289 ) -> Vec<BulkResult> {
290 let operations = domains
291 .into_iter()
292 .map(|domain| BulkOperation::Dns {
293 domain,
294 record_type,
295 })
296 .collect();
297 self.execute(operations, None).await
298 }
299
300 pub async fn execute_propagation(
301 &self,
302 domains: Vec<String>,
303 record_type: RecordType,
304 ) -> Vec<BulkResult> {
305 let operations = domains
306 .into_iter()
307 .map(|domain| BulkOperation::Propagation {
308 domain,
309 record_type,
310 })
311 .collect();
312 self.execute(operations, None).await
313 }
314
315 pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
316 let operations = domains
317 .into_iter()
318 .map(|domain| BulkOperation::Lookup { domain })
319 .collect();
320 self.execute(operations, None).await
321 }
322
323 pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
324 let operations = domains
325 .into_iter()
326 .map(|domain| BulkOperation::Status { domain })
327 .collect();
328 self.execute(operations, None).await
329 }
330
331 pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
332 let operations = domains
333 .into_iter()
334 .map(|domain| BulkOperation::Avail { domain })
335 .collect();
336 self.execute(operations, None).await
337 }
338
339 pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
340 let operations = domains
341 .into_iter()
342 .map(|domain| BulkOperation::Info { domain })
343 .collect();
344 self.execute(operations, None).await
345 }
346
347 pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
348 let operations = domains
349 .into_iter()
350 .map(|domain| BulkOperation::Ssl { domain })
351 .collect();
352 self.execute(operations, None).await
353 }
354}
355
356struct Clients<'a> {
357 whois: &'a WhoisClient,
358 rdap: &'a RdapClient,
359 dns: &'a DnsResolver,
360 propagation: &'a PropagationChecker,
361 lookup: &'a SmartLookup,
362 status: &'a StatusClient,
363 avail: &'a AvailabilityChecker,
364 ssl: &'a SslChecker,
365}
366
367async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
368 match op {
369 BulkOperation::Whois { domain } => {
370 let result = clients.whois.lookup(domain).await?;
371 Ok(BulkResultData::Whois(result))
372 }
373 BulkOperation::Rdap { domain } => {
374 let result = clients.rdap.lookup_domain(domain).await?;
375 Ok(BulkResultData::Rdap(Box::new(result)))
376 }
377 BulkOperation::Dns {
378 domain,
379 record_type,
380 } => {
381 let result = clients.dns.resolve(domain, *record_type, None).await?;
382 Ok(BulkResultData::Dns(result))
383 }
384 BulkOperation::Propagation {
385 domain,
386 record_type,
387 } => {
388 let result = clients.propagation.check(domain, *record_type).await?;
389 Ok(BulkResultData::Propagation(result))
390 }
391 BulkOperation::Lookup { domain } => {
392 let result = clients.lookup.lookup(domain).await?;
393 Ok(BulkResultData::Lookup(result))
394 }
395 BulkOperation::Status { domain } => {
396 let result = clients.status.check(domain).await?;
397 Ok(BulkResultData::Status(result))
398 }
399 BulkOperation::Avail { domain } => {
400 let result = clients.avail.check(domain).await?;
401 Ok(BulkResultData::Avail(result))
402 }
403 BulkOperation::Info { domain } => {
404 let result = clients.lookup.lookup(domain).await?;
405 Ok(BulkResultData::Info(
406 crate::domain_info::DomainInfo::from_lookup_result(&result),
407 ))
408 }
409 BulkOperation::Ssl { domain } => {
410 let result = clients.ssl.check(domain).await?;
411 Ok(BulkResultData::Ssl(result))
412 }
413 }
414}
415
416const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
418
419fn is_csv_header_row(first: &str) -> bool {
426 let first_col = first.split(',').next().unwrap_or(first).trim();
427 if first_col.contains('.') {
429 return false;
430 }
431 let label = first_col.to_lowercase();
432 HEADER_KEYWORDS.contains(&label.as_str())
433}
434
435pub fn parse_domains_from_file(content: &str) -> Vec<String> {
436 let mut domains: Vec<String> = content
437 .lines()
438 .map(|line| line.trim())
439 .filter(|line| !line.is_empty() && !line.starts_with('#'))
440 .map(|line| {
441 line.split(',').next().unwrap_or(line).trim().to_string()
443 })
444 .filter(|domain| domain.contains('.'))
445 .collect();
446
447 if domains
455 .first()
456 .map(|d| is_csv_header_row(d))
457 .unwrap_or(false)
458 {
459 domains.remove(0);
460 }
461
462 domains
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468
469 #[test]
470 fn test_parse_domains_from_file() {
471 let content = r#"
472# This is a comment
473example1.com
474google2.com
475 whitespace3.com
476invalid
477csv,format,example.org
478"#;
479
480 let domains = parse_domains_from_file(content);
481 assert_eq!(domains.len(), 3);
482 assert!(domains.contains(&"example1.com".to_string()));
483 assert!(domains.contains(&"google2.com".to_string()));
484 assert!(domains.contains(&"whitespace3.com".to_string()));
485 }
487
488 #[test]
489 fn test_parse_domains_skip_bare_header() {
490 let content = "domain\nexample.com\n";
492 let domains = parse_domains_from_file(content);
493 assert_eq!(domains, vec!["example.com"]);
494 }
495
496 #[test]
497 fn test_parse_domains_multi_column_csv_header_dropped() {
498 let content = "domain,status\ngoogle.com,live\n";
501 let domains = parse_domains_from_file(content);
502 assert_eq!(domains, vec!["google.com"]);
503 assert!(!domains.contains(&"domain".to_string()));
504 }
505
506 #[test]
507 fn first_domain_with_no_digits_is_kept() {
508 let input = "google.com\namazon.com\n";
511 let result = parse_domains_from_file(input);
512 assert_eq!(result, vec!["google.com", "amazon.com"]);
513 }
514
515 #[test]
516 fn header_row_named_hostname_is_dropped() {
517 let input = "hostname\nexample.com\n";
518 let result = parse_domains_from_file(input);
519 assert_eq!(result, vec!["example.com"]);
520 }
521
522 #[test]
523 fn domain_dot_com_is_not_dropped_as_header() {
524 let input = "domain.com\nexample.com\n";
526 let result = parse_domains_from_file(input);
527 assert_eq!(result, vec!["domain.com", "example.com"]);
528 }
529
530 #[test]
531 fn is_csv_header_row_detects_bare_keywords() {
532 assert!(is_csv_header_row("domain"));
533 assert!(is_csv_header_row("Hostname"));
534 assert!(is_csv_header_row("URL"));
535 assert!(is_csv_header_row("domain,status,notes"));
536 assert!(is_csv_header_row(" host "));
537 }
538
539 #[test]
540 fn is_csv_header_row_rejects_dotted_values() {
541 assert!(!is_csv_header_row("domain.com"));
542 assert!(!is_csv_header_row("google.com"));
543 assert!(!is_csv_header_row("host.name"));
544 }
545
546 #[test]
547 fn is_csv_header_row_rejects_non_keyword() {
548 assert!(!is_csv_header_row("example"));
549 assert!(!is_csv_header_row("mydata"));
550 }
551
552 #[tokio::test]
568 async fn rate_limiter_dispatches_in_parallel_not_serialized() {
569 use std::time::Instant;
570 let executor = BulkExecutor::new()
571 .with_concurrency(5)
572 .with_rate_limit(Duration::from_millis(50));
573
574 let start = Instant::now();
581 let domains = vec![
582 "seer-rl-1.invalid".to_string(),
583 "seer-rl-2.invalid".to_string(),
584 "seer-rl-3.invalid".to_string(),
585 "seer-rl-4.invalid".to_string(),
586 ];
587 let results = executor.execute_ssl(domains).await;
588 let elapsed = start.elapsed();
589
590 assert_eq!(results.len(), 4);
591 assert!(
595 elapsed < Duration::from_secs(2),
596 "rate-limited dispatch should run in parallel; took {:?}",
597 elapsed
598 );
599 }
600
601 #[tokio::test]
602 async fn execute_ssl_failure_path_for_unresolvable_host() {
603 let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
607 let results = executor
608 .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
609 .await;
610 assert_eq!(results.len(), 1);
611 let r = &results[0];
612 assert!(!r.success, "expected failure, got success");
613 assert!(r.data.is_none(), "expected no data on failure");
614 let err = r.error.as_deref().unwrap_or("");
615 assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
616 assert!(
617 matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
618 "expected Ssl variant, got {:?}",
619 r.operation
620 );
621 }
622
623 #[tokio::test]
624 #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
625 async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
626 let executor = BulkExecutor::new();
627 let results = executor
628 .execute_ssl(vec!["cloudflare.com".to_string()])
629 .await;
630 assert_eq!(results.len(), 1);
631 let r = &results[0];
632 assert!(r.success, "expected success, got error: {:?}", r.error);
633 let Some(BulkResultData::Ssl(ref report)) = r.data else {
634 panic!("expected Ssl data, got {:?}", r.data);
635 };
636 assert!(!report.chain.is_empty(), "chain must not be empty");
637 assert!(report.is_valid, "cloudflare leaf cert should be valid");
638 assert!(
639 report.days_until_expiry > 0,
640 "cert should still be valid in the future, got {} days",
641 report.days_until_expiry
642 );
643 assert!(
644 !report.san_names.is_empty(),
645 "cloudflare cert should have SAN entries"
646 );
647 }
648}