Skip to main content

seer_core/bulk/
executor.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::time::sleep;
8use tracing::{debug, info, instrument};
9
10use crate::availability::{AvailabilityChecker, AvailabilityResult};
11use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
12use crate::error::Result;
13use crate::lookup::{LookupResult, SmartLookup};
14use crate::rdap::{RdapClient, RdapResponse};
15use crate::ssl::{SslChecker, SslReport};
16use crate::status::{StatusClient, StatusResponse};
17use crate::whois::{WhoisClient, WhoisResponse};
18
19pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
20
21/// A single operation to perform in a bulk execution batch.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum BulkOperation {
25    Whois {
26        domain: String,
27    },
28    Rdap {
29        domain: String,
30    },
31    Dns {
32        domain: String,
33        record_type: RecordType,
34    },
35    Propagation {
36        domain: String,
37        record_type: RecordType,
38    },
39    Lookup {
40        domain: String,
41    },
42    Status {
43        domain: String,
44    },
45    Avail {
46        domain: String,
47    },
48    Info {
49        domain: String,
50    },
51    Ssl {
52        domain: String,
53    },
54}
55
56/// The data returned from a bulk operation (varies by operation type).
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
59pub enum BulkResultData {
60    Whois(WhoisResponse),
61    Rdap(Box<RdapResponse>),
62    Dns(Vec<DnsRecord>),
63    Propagation(PropagationResult),
64    Lookup(LookupResult),
65    Status(StatusResponse),
66    Avail(AvailabilityResult),
67    Info(crate::domain_info::DomainInfo),
68    Ssl(SslReport),
69}
70
71/// Result of a single operation within a bulk execution.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct BulkResult {
74    pub operation: BulkOperation,
75    pub success: bool,
76    pub data: Option<BulkResultData>,
77    pub error: Option<String>,
78    pub duration_ms: u64,
79}
80
81/// Executes bulk operations concurrently with rate limiting.
82#[derive(Debug, Clone)]
83pub struct BulkExecutor {
84    concurrency: usize,
85    rate_limit_delay: Duration,
86    whois_client: WhoisClient,
87    rdap_client: RdapClient,
88    dns_resolver: DnsResolver,
89    propagation_checker: PropagationChecker,
90    smart_lookup: SmartLookup,
91    status_client: StatusClient,
92    availability_checker: AvailabilityChecker,
93    ssl_checker: SslChecker,
94}
95
96impl Default for BulkExecutor {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102impl BulkExecutor {
103    pub fn new() -> Self {
104        Self {
105            concurrency: 10,
106            rate_limit_delay: Duration::from_millis(100),
107            whois_client: WhoisClient::new(),
108            rdap_client: RdapClient::new(),
109            dns_resolver: DnsResolver::new(),
110            propagation_checker: PropagationChecker::new(),
111            smart_lookup: SmartLookup::new(),
112            status_client: StatusClient::new(),
113            availability_checker: AvailabilityChecker::new(),
114            ssl_checker: SslChecker::new(),
115        }
116    }
117
118    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
119        self.concurrency = concurrency.clamp(1, 50);
120        self
121    }
122
123    pub fn with_rate_limit(mut self, delay: Duration) -> Self {
124        self.rate_limit_delay = delay;
125        self
126    }
127
128    #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
129    pub async fn execute(
130        &self,
131        operations: Vec<BulkOperation>,
132        progress: Option<ProgressCallback>,
133    ) -> Vec<BulkResult> {
134        let total = operations.len();
135        let completed = Arc::new(AtomicUsize::new(0));
136
137        info!("bulk operation started");
138        debug!(
139            total = total,
140            concurrency = self.concurrency,
141            "Starting bulk execution"
142        );
143
144        let results: Vec<BulkResult> = stream::iter(operations)
145            .map(|op| {
146                let completed = completed.clone();
147                let progress = progress.as_ref();
148                let rate_limit_delay = self.rate_limit_delay;
149                let whois_client = &self.whois_client;
150                let rdap_client = &self.rdap_client;
151                let dns_resolver = &self.dns_resolver;
152                let propagation_checker = &self.propagation_checker;
153                let smart_lookup = &self.smart_lookup;
154                let status_client = &self.status_client;
155                let availability_checker = &self.availability_checker;
156                let ssl_checker = &self.ssl_checker;
157
158                async move {
159                    // Rate limiting delay
160                    if !rate_limit_delay.is_zero() {
161                        sleep(rate_limit_delay).await;
162                    }
163
164                    let start = std::time::Instant::now();
165                    let result = execute_operation(
166                        &op,
167                        &Clients {
168                            whois: whois_client,
169                            rdap: rdap_client,
170                            dns: dns_resolver,
171                            propagation: propagation_checker,
172                            lookup: smart_lookup,
173                            status: status_client,
174                            avail: availability_checker,
175                            ssl: ssl_checker,
176                        },
177                    )
178                    .await;
179                    let duration_ms = start.elapsed().as_millis() as u64;
180
181                    let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
182
183                    if let Some(progress) = progress {
184                        let desc = match &op {
185                            BulkOperation::Whois { domain }
186                            | BulkOperation::Rdap { domain }
187                            | BulkOperation::Dns { domain, .. }
188                            | BulkOperation::Propagation { domain, .. }
189                            | BulkOperation::Lookup { domain }
190                            | BulkOperation::Status { domain }
191                            | BulkOperation::Avail { domain }
192                            | BulkOperation::Info { domain }
193                            | BulkOperation::Ssl { domain } => domain.as_str(),
194                        };
195                        progress(count, total, desc);
196                    }
197
198                    match result {
199                        Ok(data) => BulkResult {
200                            operation: op,
201                            success: true,
202                            data: Some(data),
203                            error: None,
204                            duration_ms,
205                        },
206                        Err(e) => {
207                            debug!(error = %e, "Bulk operation failed");
208                            BulkResult {
209                                operation: op,
210                                success: false,
211                                data: None,
212                                error: Some(e.to_string()),
213                                duration_ms,
214                            }
215                        }
216                    }
217                }
218            })
219            .buffer_unordered(self.concurrency)
220            .collect()
221            .await;
222
223        let succeeded = results.iter().filter(|r| r.success).count();
224        let failed = results.iter().filter(|r| !r.success).count();
225        info!(
226            total = total,
227            succeeded = succeeded,
228            failed = failed,
229            "bulk operation completed"
230        );
231
232        results
233    }
234
235    pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
236        let operations = domains
237            .into_iter()
238            .map(|domain| BulkOperation::Whois { domain })
239            .collect();
240        self.execute(operations, None).await
241    }
242
243    pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
244        let operations = domains
245            .into_iter()
246            .map(|domain| BulkOperation::Rdap { domain })
247            .collect();
248        self.execute(operations, None).await
249    }
250
251    pub async fn execute_dns(
252        &self,
253        domains: Vec<String>,
254        record_type: RecordType,
255    ) -> Vec<BulkResult> {
256        let operations = domains
257            .into_iter()
258            .map(|domain| BulkOperation::Dns {
259                domain,
260                record_type,
261            })
262            .collect();
263        self.execute(operations, None).await
264    }
265
266    pub async fn execute_propagation(
267        &self,
268        domains: Vec<String>,
269        record_type: RecordType,
270    ) -> Vec<BulkResult> {
271        let operations = domains
272            .into_iter()
273            .map(|domain| BulkOperation::Propagation {
274                domain,
275                record_type,
276            })
277            .collect();
278        self.execute(operations, None).await
279    }
280
281    pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
282        let operations = domains
283            .into_iter()
284            .map(|domain| BulkOperation::Lookup { domain })
285            .collect();
286        self.execute(operations, None).await
287    }
288
289    pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
290        let operations = domains
291            .into_iter()
292            .map(|domain| BulkOperation::Status { domain })
293            .collect();
294        self.execute(operations, None).await
295    }
296
297    pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
298        let operations = domains
299            .into_iter()
300            .map(|domain| BulkOperation::Avail { domain })
301            .collect();
302        self.execute(operations, None).await
303    }
304
305    pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
306        let operations = domains
307            .into_iter()
308            .map(|domain| BulkOperation::Info { domain })
309            .collect();
310        self.execute(operations, None).await
311    }
312
313    pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
314        let operations = domains
315            .into_iter()
316            .map(|domain| BulkOperation::Ssl { domain })
317            .collect();
318        self.execute(operations, None).await
319    }
320}
321
322struct Clients<'a> {
323    whois: &'a WhoisClient,
324    rdap: &'a RdapClient,
325    dns: &'a DnsResolver,
326    propagation: &'a PropagationChecker,
327    lookup: &'a SmartLookup,
328    status: &'a StatusClient,
329    avail: &'a AvailabilityChecker,
330    ssl: &'a SslChecker,
331}
332
333async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
334    match op {
335        BulkOperation::Whois { domain } => {
336            let result = clients.whois.lookup(domain).await?;
337            Ok(BulkResultData::Whois(result))
338        }
339        BulkOperation::Rdap { domain } => {
340            let result = clients.rdap.lookup_domain(domain).await?;
341            Ok(BulkResultData::Rdap(Box::new(result)))
342        }
343        BulkOperation::Dns {
344            domain,
345            record_type,
346        } => {
347            let result = clients.dns.resolve(domain, *record_type, None).await?;
348            Ok(BulkResultData::Dns(result))
349        }
350        BulkOperation::Propagation {
351            domain,
352            record_type,
353        } => {
354            let result = clients.propagation.check(domain, *record_type).await?;
355            Ok(BulkResultData::Propagation(result))
356        }
357        BulkOperation::Lookup { domain } => {
358            let result = clients.lookup.lookup(domain).await?;
359            Ok(BulkResultData::Lookup(result))
360        }
361        BulkOperation::Status { domain } => {
362            let result = clients.status.check(domain).await?;
363            Ok(BulkResultData::Status(result))
364        }
365        BulkOperation::Avail { domain } => {
366            let result = clients.avail.check(domain).await?;
367            Ok(BulkResultData::Avail(result))
368        }
369        BulkOperation::Info { domain } => {
370            let result = clients.lookup.lookup(domain).await?;
371            Ok(BulkResultData::Info(
372                crate::domain_info::DomainInfo::from_lookup_result(&result),
373            ))
374        }
375        BulkOperation::Ssl { domain } => {
376            let result = clients.ssl.check(domain).await?;
377            Ok(BulkResultData::Ssl(result))
378        }
379    }
380}
381
382/// Keywords commonly used as CSV header labels for a domain column.
383const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
384
385/// Returns true if `first` looks like a CSV header row rather than a real domain.
386///
387/// Heuristic: take the first comma-delimited column and trim it. Real domains
388/// always contain a `.` (e.g., "example.com", "domain.com"); a bare keyword
389/// like "domain" or "hostname" does not. Only treat the row as a header when
390/// the first column has no dot AND matches a known header keyword.
391fn is_csv_header_row(first: &str) -> bool {
392    let first_col = first.split(',').next().unwrap_or(first).trim();
393    // Real domains always contain a dot; a bare keyword does not.
394    if first_col.contains('.') {
395        return false;
396    }
397    let label = first_col.to_lowercase();
398    HEADER_KEYWORDS.contains(&label.as_str())
399}
400
401pub fn parse_domains_from_file(content: &str) -> Vec<String> {
402    let mut domains: Vec<String> = content
403        .lines()
404        .map(|line| line.trim())
405        .filter(|line| !line.is_empty() && !line.starts_with('#'))
406        .map(|line| {
407            // Handle CSV format (take first column)
408            line.split(',').next().unwrap_or(line).trim().to_string()
409        })
410        .filter(|domain| domain.contains('.'))
411        .collect();
412
413    // A bare-keyword CSV header like "domain" / "hostname" has no dot and is
414    // already dropped by the filter above. `is_csv_header_row` additionally
415    // covers the edge case where the first surviving entry is a dotted header
416    // (rare, and the current heuristic treats such values as real domains —
417    // see `domain_dot_com_is_not_dropped_as_header`). The guard below is a
418    // belt-and-suspenders check that stays correct if the upstream filter
419    // ever changes.
420    if domains
421        .first()
422        .map(|d| is_csv_header_row(d))
423        .unwrap_or(false)
424    {
425        domains.remove(0);
426    }
427
428    domains
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn test_parse_domains_from_file() {
437        let content = r#"
438# This is a comment
439example1.com
440google2.com
441  whitespace3.com
442invalid
443csv,format,example.org
444"#;
445
446        let domains = parse_domains_from_file(content);
447        assert_eq!(domains.len(), 3);
448        assert!(domains.contains(&"example1.com".to_string()));
449        assert!(domains.contains(&"google2.com".to_string()));
450        assert!(domains.contains(&"whitespace3.com".to_string()));
451        // "invalid" and "csv" are filtered out because they don't contain a dot
452    }
453
454    #[test]
455    fn test_parse_domains_skip_bare_header() {
456        // Bare-keyword header ("domain") is dropped by the dot filter.
457        let content = "domain\nexample.com\n";
458        let domains = parse_domains_from_file(content);
459        assert_eq!(domains, vec!["example.com"]);
460    }
461
462    #[test]
463    fn test_parse_domains_multi_column_csv_header_dropped() {
464        // First column is a bare keyword → whole header row has no dot in col 1
465        // and is dropped by the dot filter.
466        let content = "domain,status\ngoogle.com,live\n";
467        let domains = parse_domains_from_file(content);
468        assert_eq!(domains, vec!["google.com"]);
469        assert!(!domains.contains(&"domain".to_string()));
470    }
471
472    #[test]
473    fn first_domain_with_no_digits_is_kept() {
474        // Regression: previous heuristic dropped the first entry when it
475        // had no ASCII digits, losing "google.com" / "amazon.com" / ...
476        let input = "google.com\namazon.com\n";
477        let result = parse_domains_from_file(input);
478        assert_eq!(result, vec!["google.com", "amazon.com"]);
479    }
480
481    #[test]
482    fn header_row_named_hostname_is_dropped() {
483        let input = "hostname\nexample.com\n";
484        let result = parse_domains_from_file(input);
485        assert_eq!(result, vec!["example.com"]);
486    }
487
488    #[test]
489    fn domain_dot_com_is_not_dropped_as_header() {
490        // "domain.com" is a real domain, not a header, because it has a dot.
491        let input = "domain.com\nexample.com\n";
492        let result = parse_domains_from_file(input);
493        assert_eq!(result, vec!["domain.com", "example.com"]);
494    }
495
496    #[test]
497    fn is_csv_header_row_detects_bare_keywords() {
498        assert!(is_csv_header_row("domain"));
499        assert!(is_csv_header_row("Hostname"));
500        assert!(is_csv_header_row("URL"));
501        assert!(is_csv_header_row("domain,status,notes"));
502        assert!(is_csv_header_row("  host  "));
503    }
504
505    #[test]
506    fn is_csv_header_row_rejects_dotted_values() {
507        assert!(!is_csv_header_row("domain.com"));
508        assert!(!is_csv_header_row("google.com"));
509        assert!(!is_csv_header_row("host.name"));
510    }
511
512    #[test]
513    fn is_csv_header_row_rejects_non_keyword() {
514        assert!(!is_csv_header_row("example"));
515        assert!(!is_csv_header_row("mydata"));
516    }
517
518    #[tokio::test]
519    async fn execute_ssl_failure_path_for_unresolvable_host() {
520        // Verifies the SSL bulk arm wires correctly: an unresolvable hostname
521        // must surface as success=false with a non-empty error string. Uses
522        // the IETF-reserved `.invalid` TLD so this is hermetic.
523        let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
524        let results = executor
525            .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
526            .await;
527        assert_eq!(results.len(), 1);
528        let r = &results[0];
529        assert!(!r.success, "expected failure, got success");
530        assert!(r.data.is_none(), "expected no data on failure");
531        let err = r.error.as_deref().unwrap_or("");
532        assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
533        assert!(
534            matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
535            "expected Ssl variant, got {:?}",
536            r.operation
537        );
538    }
539
540    #[tokio::test]
541    #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
542    async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
543        let executor = BulkExecutor::new();
544        let results = executor
545            .execute_ssl(vec!["cloudflare.com".to_string()])
546            .await;
547        assert_eq!(results.len(), 1);
548        let r = &results[0];
549        assert!(r.success, "expected success, got error: {:?}", r.error);
550        let Some(BulkResultData::Ssl(ref report)) = r.data else {
551            panic!("expected Ssl data, got {:?}", r.data);
552        };
553        assert!(!report.chain.is_empty(), "chain must not be empty");
554        assert!(report.is_valid, "cloudflare leaf cert should be valid");
555        assert!(
556            report.days_until_expiry > 0,
557            "cert should still be valid in the future, got {} days",
558            report.days_until_expiry
559        );
560        assert!(
561            !report.san_names.is_empty(),
562            "cloudflare cert should have SAN entries"
563        );
564    }
565}