Skip to main content

seer_core/bulk/
executor.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Mutex;
8use tokio::time::{interval, MissedTickBehavior};
9use tracing::{debug, info, instrument};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::ssl::{SslChecker, SslReport};
17use crate::status::{StatusClient, StatusResponse};
18use crate::whois::{WhoisClient, WhoisResponse};
19
20pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
21
22/// A single operation to perform in a bulk execution batch.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum BulkOperation {
26    Whois {
27        domain: String,
28    },
29    Rdap {
30        domain: String,
31    },
32    Dns {
33        domain: String,
34        record_type: RecordType,
35    },
36    Propagation {
37        domain: String,
38        record_type: RecordType,
39    },
40    Lookup {
41        domain: String,
42    },
43    Status {
44        domain: String,
45    },
46    Avail {
47        domain: String,
48    },
49    Info {
50        domain: String,
51    },
52    Ssl {
53        domain: String,
54    },
55}
56
57/// The data returned from a bulk operation (varies by operation type).
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
60pub enum BulkResultData {
61    Whois(WhoisResponse),
62    Rdap(Box<RdapResponse>),
63    Dns(Vec<DnsRecord>),
64    Propagation(PropagationResult),
65    Lookup(LookupResult),
66    Status(StatusResponse),
67    Avail(AvailabilityResult),
68    Info(crate::domain_info::DomainInfo),
69    Ssl(SslReport),
70}
71
72/// Result of a single operation within a bulk execution.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BulkResult {
75    pub operation: BulkOperation,
76    pub success: bool,
77    pub data: Option<BulkResultData>,
78    pub error: Option<String>,
79    pub duration_ms: u64,
80}
81
82/// Executes bulk operations concurrently with rate limiting.
83#[derive(Debug, Clone)]
84pub struct BulkExecutor {
85    concurrency: usize,
86    rate_limit_delay: Duration,
87    whois_client: WhoisClient,
88    rdap_client: RdapClient,
89    dns_resolver: DnsResolver,
90    propagation_checker: PropagationChecker,
91    smart_lookup: SmartLookup,
92    status_client: StatusClient,
93    availability_checker: AvailabilityChecker,
94    ssl_checker: SslChecker,
95}
96
97impl Default for BulkExecutor {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl BulkExecutor {
104    pub fn new() -> Self {
105        Self {
106            concurrency: 10,
107            rate_limit_delay: Duration::from_millis(100),
108            whois_client: WhoisClient::new(),
109            rdap_client: RdapClient::new(),
110            dns_resolver: DnsResolver::new(),
111            propagation_checker: PropagationChecker::new(),
112            smart_lookup: SmartLookup::new(),
113            status_client: StatusClient::new(),
114            availability_checker: AvailabilityChecker::new(),
115            ssl_checker: SslChecker::new(),
116        }
117    }
118
119    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
120        self.concurrency = concurrency.clamp(1, 50);
121        self
122    }
123
124    pub fn with_rate_limit(mut self, delay: Duration) -> Self {
125        self.rate_limit_delay = delay;
126        self
127    }
128
129    #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
130    pub async fn execute(
131        &self,
132        operations: Vec<BulkOperation>,
133        progress: Option<ProgressCallback>,
134    ) -> Vec<BulkResult> {
135        let total = operations.len();
136        let completed = Arc::new(AtomicUsize::new(0));
137
138        info!("bulk operation started");
139        debug!(
140            total = total,
141            concurrency = self.concurrency,
142            "Starting bulk execution"
143        );
144
145        // Per-batch rate limiter. The previous design slept `rate_limit_delay`
146        // INSIDE every per-item future, but those futures run concurrently
147        // under `buffer_unordered`, so the sleep just added uniform latency
148        // and did not serialize dispatch at all. A shared `Interval` ticks
149        // at the requested rate, and each task acquires the next tick before
150        // dispatching the operation — concurrency still applies on the
151        // execution side, but dispatch is rate-limited globally.
152        let limiter = if self.rate_limit_delay.is_zero() {
153            None
154        } else {
155            let mut iv = interval(self.rate_limit_delay);
156            // The first `tick()` resolves immediately, so the first item
157            // dispatches without waiting — desirable. Skip catch-up bursts.
158            iv.set_missed_tick_behavior(MissedTickBehavior::Delay);
159            Some(Arc::new(Mutex::new(iv)))
160        };
161
162        let results: Vec<BulkResult> = stream::iter(operations)
163            .map(|op| {
164                let completed = completed.clone();
165                let progress = progress.as_ref();
166                let limiter = limiter.clone();
167                let whois_client = &self.whois_client;
168                let rdap_client = &self.rdap_client;
169                let dns_resolver = &self.dns_resolver;
170                let propagation_checker = &self.propagation_checker;
171                let smart_lookup = &self.smart_lookup;
172                let status_client = &self.status_client;
173                let availability_checker = &self.availability_checker;
174                let ssl_checker = &self.ssl_checker;
175
176                async move {
177                    // Rate-limited dispatch: wait for our turn at the shared
178                    // interval before the operation starts. With no limiter
179                    // (zero delay) every task dispatches immediately and the
180                    // semaphore-like effect comes from `buffer_unordered`.
181                    if let Some(limiter) = &limiter {
182                        limiter.lock().await.tick().await;
183                    }
184
185                    let start = std::time::Instant::now();
186                    let result = execute_operation(
187                        &op,
188                        &Clients {
189                            whois: whois_client,
190                            rdap: rdap_client,
191                            dns: dns_resolver,
192                            propagation: propagation_checker,
193                            lookup: smart_lookup,
194                            status: status_client,
195                            avail: availability_checker,
196                            ssl: ssl_checker,
197                        },
198                    )
199                    .await;
200                    let duration_ms = start.elapsed().as_millis() as u64;
201
202                    let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
203
204                    if let Some(progress) = progress {
205                        let desc = match &op {
206                            BulkOperation::Whois { domain }
207                            | BulkOperation::Rdap { domain }
208                            | BulkOperation::Dns { domain, .. }
209                            | BulkOperation::Propagation { domain, .. }
210                            | BulkOperation::Lookup { domain }
211                            | BulkOperation::Status { domain }
212                            | BulkOperation::Avail { domain }
213                            | BulkOperation::Info { domain }
214                            | BulkOperation::Ssl { domain } => domain.as_str(),
215                        };
216                        progress(count, total, desc);
217                    }
218
219                    match result {
220                        Ok(data) => BulkResult {
221                            operation: op,
222                            success: true,
223                            data: Some(data),
224                            error: None,
225                            duration_ms,
226                        },
227                        Err(e) => {
228                            debug!(error = %e, "Bulk operation failed");
229                            BulkResult {
230                                operation: op,
231                                success: false,
232                                data: None,
233                                error: Some(e.to_string()),
234                                duration_ms,
235                            }
236                        }
237                    }
238                }
239            })
240            .buffer_unordered(self.concurrency)
241            .collect()
242            .await;
243
244        let succeeded = results.iter().filter(|r| r.success).count();
245        let failed = results.iter().filter(|r| !r.success).count();
246        info!(
247            total = total,
248            succeeded = succeeded,
249            failed = failed,
250            "bulk operation completed"
251        );
252
253        results
254    }
255
256    pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
257        let operations = domains
258            .into_iter()
259            .map(|domain| BulkOperation::Whois { domain })
260            .collect();
261        self.execute(operations, None).await
262    }
263
264    pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
265        let operations = domains
266            .into_iter()
267            .map(|domain| BulkOperation::Rdap { domain })
268            .collect();
269        self.execute(operations, None).await
270    }
271
272    pub async fn execute_dns(
273        &self,
274        domains: Vec<String>,
275        record_type: RecordType,
276    ) -> Vec<BulkResult> {
277        let operations = domains
278            .into_iter()
279            .map(|domain| BulkOperation::Dns {
280                domain,
281                record_type,
282            })
283            .collect();
284        self.execute(operations, None).await
285    }
286
287    pub async fn execute_propagation(
288        &self,
289        domains: Vec<String>,
290        record_type: RecordType,
291    ) -> Vec<BulkResult> {
292        let operations = domains
293            .into_iter()
294            .map(|domain| BulkOperation::Propagation {
295                domain,
296                record_type,
297            })
298            .collect();
299        self.execute(operations, None).await
300    }
301
302    pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
303        let operations = domains
304            .into_iter()
305            .map(|domain| BulkOperation::Lookup { domain })
306            .collect();
307        self.execute(operations, None).await
308    }
309
310    pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
311        let operations = domains
312            .into_iter()
313            .map(|domain| BulkOperation::Status { domain })
314            .collect();
315        self.execute(operations, None).await
316    }
317
318    pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
319        let operations = domains
320            .into_iter()
321            .map(|domain| BulkOperation::Avail { domain })
322            .collect();
323        self.execute(operations, None).await
324    }
325
326    pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
327        let operations = domains
328            .into_iter()
329            .map(|domain| BulkOperation::Info { domain })
330            .collect();
331        self.execute(operations, None).await
332    }
333
334    pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
335        let operations = domains
336            .into_iter()
337            .map(|domain| BulkOperation::Ssl { domain })
338            .collect();
339        self.execute(operations, None).await
340    }
341}
342
343struct Clients<'a> {
344    whois: &'a WhoisClient,
345    rdap: &'a RdapClient,
346    dns: &'a DnsResolver,
347    propagation: &'a PropagationChecker,
348    lookup: &'a SmartLookup,
349    status: &'a StatusClient,
350    avail: &'a AvailabilityChecker,
351    ssl: &'a SslChecker,
352}
353
354async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
355    match op {
356        BulkOperation::Whois { domain } => {
357            let result = clients.whois.lookup(domain).await?;
358            Ok(BulkResultData::Whois(result))
359        }
360        BulkOperation::Rdap { domain } => {
361            let result = clients.rdap.lookup_domain(domain).await?;
362            Ok(BulkResultData::Rdap(Box::new(result)))
363        }
364        BulkOperation::Dns {
365            domain,
366            record_type,
367        } => {
368            let result = clients.dns.resolve(domain, *record_type, None).await?;
369            Ok(BulkResultData::Dns(result))
370        }
371        BulkOperation::Propagation {
372            domain,
373            record_type,
374        } => {
375            let result = clients.propagation.check(domain, *record_type).await?;
376            Ok(BulkResultData::Propagation(result))
377        }
378        BulkOperation::Lookup { domain } => {
379            let result = clients.lookup.lookup(domain).await?;
380            Ok(BulkResultData::Lookup(result))
381        }
382        BulkOperation::Status { domain } => {
383            let result = clients.status.check(domain).await?;
384            Ok(BulkResultData::Status(result))
385        }
386        BulkOperation::Avail { domain } => {
387            let result = clients.avail.check(domain).await?;
388            Ok(BulkResultData::Avail(result))
389        }
390        BulkOperation::Info { domain } => {
391            let result = clients.lookup.lookup(domain).await?;
392            Ok(BulkResultData::Info(
393                crate::domain_info::DomainInfo::from_lookup_result(&result),
394            ))
395        }
396        BulkOperation::Ssl { domain } => {
397            let result = clients.ssl.check(domain).await?;
398            Ok(BulkResultData::Ssl(result))
399        }
400    }
401}
402
403/// Keywords commonly used as CSV header labels for a domain column.
404const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
405
406/// Returns true if `first` looks like a CSV header row rather than a real domain.
407///
408/// Heuristic: take the first comma-delimited column and trim it. Real domains
409/// always contain a `.` (e.g., "example.com", "domain.com"); a bare keyword
410/// like "domain" or "hostname" does not. Only treat the row as a header when
411/// the first column has no dot AND matches a known header keyword.
412fn is_csv_header_row(first: &str) -> bool {
413    let first_col = first.split(',').next().unwrap_or(first).trim();
414    // Real domains always contain a dot; a bare keyword does not.
415    if first_col.contains('.') {
416        return false;
417    }
418    let label = first_col.to_lowercase();
419    HEADER_KEYWORDS.contains(&label.as_str())
420}
421
422pub fn parse_domains_from_file(content: &str) -> Vec<String> {
423    let mut domains: Vec<String> = content
424        .lines()
425        .map(|line| line.trim())
426        .filter(|line| !line.is_empty() && !line.starts_with('#'))
427        .map(|line| {
428            // Handle CSV format (take first column)
429            line.split(',').next().unwrap_or(line).trim().to_string()
430        })
431        .filter(|domain| domain.contains('.'))
432        .collect();
433
434    // A bare-keyword CSV header like "domain" / "hostname" has no dot and is
435    // already dropped by the filter above. `is_csv_header_row` additionally
436    // covers the edge case where the first surviving entry is a dotted header
437    // (rare, and the current heuristic treats such values as real domains —
438    // see `domain_dot_com_is_not_dropped_as_header`). The guard below is a
439    // belt-and-suspenders check that stays correct if the upstream filter
440    // ever changes.
441    if domains
442        .first()
443        .map(|d| is_csv_header_row(d))
444        .unwrap_or(false)
445    {
446        domains.remove(0);
447    }
448
449    domains
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    #[test]
457    fn test_parse_domains_from_file() {
458        let content = r#"
459# This is a comment
460example1.com
461google2.com
462  whitespace3.com
463invalid
464csv,format,example.org
465"#;
466
467        let domains = parse_domains_from_file(content);
468        assert_eq!(domains.len(), 3);
469        assert!(domains.contains(&"example1.com".to_string()));
470        assert!(domains.contains(&"google2.com".to_string()));
471        assert!(domains.contains(&"whitespace3.com".to_string()));
472        // "invalid" and "csv" are filtered out because they don't contain a dot
473    }
474
475    #[test]
476    fn test_parse_domains_skip_bare_header() {
477        // Bare-keyword header ("domain") is dropped by the dot filter.
478        let content = "domain\nexample.com\n";
479        let domains = parse_domains_from_file(content);
480        assert_eq!(domains, vec!["example.com"]);
481    }
482
483    #[test]
484    fn test_parse_domains_multi_column_csv_header_dropped() {
485        // First column is a bare keyword → whole header row has no dot in col 1
486        // and is dropped by the dot filter.
487        let content = "domain,status\ngoogle.com,live\n";
488        let domains = parse_domains_from_file(content);
489        assert_eq!(domains, vec!["google.com"]);
490        assert!(!domains.contains(&"domain".to_string()));
491    }
492
493    #[test]
494    fn first_domain_with_no_digits_is_kept() {
495        // Regression: previous heuristic dropped the first entry when it
496        // had no ASCII digits, losing "google.com" / "amazon.com" / ...
497        let input = "google.com\namazon.com\n";
498        let result = parse_domains_from_file(input);
499        assert_eq!(result, vec!["google.com", "amazon.com"]);
500    }
501
502    #[test]
503    fn header_row_named_hostname_is_dropped() {
504        let input = "hostname\nexample.com\n";
505        let result = parse_domains_from_file(input);
506        assert_eq!(result, vec!["example.com"]);
507    }
508
509    #[test]
510    fn domain_dot_com_is_not_dropped_as_header() {
511        // "domain.com" is a real domain, not a header, because it has a dot.
512        let input = "domain.com\nexample.com\n";
513        let result = parse_domains_from_file(input);
514        assert_eq!(result, vec!["domain.com", "example.com"]);
515    }
516
517    #[test]
518    fn is_csv_header_row_detects_bare_keywords() {
519        assert!(is_csv_header_row("domain"));
520        assert!(is_csv_header_row("Hostname"));
521        assert!(is_csv_header_row("URL"));
522        assert!(is_csv_header_row("domain,status,notes"));
523        assert!(is_csv_header_row("  host  "));
524    }
525
526    #[test]
527    fn is_csv_header_row_rejects_dotted_values() {
528        assert!(!is_csv_header_row("domain.com"));
529        assert!(!is_csv_header_row("google.com"));
530        assert!(!is_csv_header_row("host.name"));
531    }
532
533    #[test]
534    fn is_csv_header_row_rejects_non_keyword() {
535        assert!(!is_csv_header_row("example"));
536        assert!(!is_csv_header_row("mydata"));
537    }
538
539    #[tokio::test]
540    async fn execute_ssl_failure_path_for_unresolvable_host() {
541        // Verifies the SSL bulk arm wires correctly: an unresolvable hostname
542        // must surface as success=false with a non-empty error string. Uses
543        // the IETF-reserved `.invalid` TLD so this is hermetic.
544        let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
545        let results = executor
546            .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
547            .await;
548        assert_eq!(results.len(), 1);
549        let r = &results[0];
550        assert!(!r.success, "expected failure, got success");
551        assert!(r.data.is_none(), "expected no data on failure");
552        let err = r.error.as_deref().unwrap_or("");
553        assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
554        assert!(
555            matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
556            "expected Ssl variant, got {:?}",
557            r.operation
558        );
559    }
560
561    #[tokio::test]
562    #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
563    async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
564        let executor = BulkExecutor::new();
565        let results = executor
566            .execute_ssl(vec!["cloudflare.com".to_string()])
567            .await;
568        assert_eq!(results.len(), 1);
569        let r = &results[0];
570        assert!(r.success, "expected success, got error: {:?}", r.error);
571        let Some(BulkResultData::Ssl(ref report)) = r.data else {
572            panic!("expected Ssl data, got {:?}", r.data);
573        };
574        assert!(!report.chain.is_empty(), "chain must not be empty");
575        assert!(report.is_valid, "cloudflare leaf cert should be valid");
576        assert!(
577            report.days_until_expiry > 0,
578            "cert should still be valid in the future, got {} days",
579            report.days_until_expiry
580        );
581        assert!(
582            !report.san_names.is_empty(),
583            "cloudflare cert should have SAN entries"
584        );
585    }
586}