Skip to main content

seer_core/bulk/
executor.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Mutex;
8use tokio::time::{sleep_until, Instant as TokioInstant};
9use tracing::{debug, info, instrument};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::ssl::{SslChecker, SslReport};
17use crate::status::{StatusClient, StatusResponse};
18use crate::whois::{WhoisClient, WhoisResponse};
19
20pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
21
22/// A single operation to perform in a bulk execution batch.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum BulkOperation {
26    Whois {
27        domain: String,
28    },
29    Rdap {
30        domain: String,
31    },
32    Dns {
33        domain: String,
34        record_type: RecordType,
35    },
36    Propagation {
37        domain: String,
38        record_type: RecordType,
39    },
40    Lookup {
41        domain: String,
42    },
43    Status {
44        domain: String,
45    },
46    Avail {
47        domain: String,
48    },
49    Info {
50        domain: String,
51    },
52    Ssl {
53        domain: String,
54    },
55}
56
57/// The data returned from a bulk operation (varies by operation type).
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
60pub enum BulkResultData {
61    Whois(WhoisResponse),
62    Rdap(Box<RdapResponse>),
63    Dns(Vec<DnsRecord>),
64    Propagation(PropagationResult),
65    Lookup(LookupResult),
66    Status(StatusResponse),
67    Avail(AvailabilityResult),
68    Info(crate::domain_info::DomainInfo),
69    Ssl(SslReport),
70}
71
72/// Result of a single operation within a bulk execution.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BulkResult {
75    pub operation: BulkOperation,
76    pub success: bool,
77    pub data: Option<BulkResultData>,
78    pub error: Option<String>,
79    pub duration_ms: u64,
80}
81
82/// Executes bulk operations concurrently with rate limiting.
83#[derive(Debug, Clone)]
84pub struct BulkExecutor {
85    concurrency: usize,
86    rate_limit_delay: Duration,
87    whois_client: WhoisClient,
88    rdap_client: RdapClient,
89    dns_resolver: DnsResolver,
90    propagation_checker: PropagationChecker,
91    smart_lookup: SmartLookup,
92    status_client: StatusClient,
93    availability_checker: AvailabilityChecker,
94    ssl_checker: SslChecker,
95}
96
97impl Default for BulkExecutor {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl BulkExecutor {
104    pub fn new() -> Self {
105        Self {
106            concurrency: 10,
107            rate_limit_delay: Duration::from_millis(100),
108            whois_client: WhoisClient::new(),
109            rdap_client: RdapClient::new(),
110            dns_resolver: DnsResolver::new(),
111            propagation_checker: PropagationChecker::new(),
112            smart_lookup: SmartLookup::new(),
113            status_client: StatusClient::new(),
114            availability_checker: AvailabilityChecker::new(),
115            ssl_checker: SslChecker::new(),
116        }
117    }
118
119    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
120        self.concurrency = concurrency.clamp(1, 50);
121        self
122    }
123
124    pub fn with_rate_limit(mut self, delay: Duration) -> Self {
125        self.rate_limit_delay = delay;
126        self
127    }
128
129    #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
130    pub async fn execute(
131        &self,
132        operations: Vec<BulkOperation>,
133        progress: Option<ProgressCallback>,
134    ) -> Vec<BulkResult> {
135        let total = operations.len();
136        let completed = Arc::new(AtomicUsize::new(0));
137
138        info!("bulk operation started");
139        debug!(
140            total = total,
141            concurrency = self.concurrency,
142            "Starting bulk execution"
143        );
144
145        // Per-batch rate limiter. The naive `interval(...).tick().await` inside
146        // a `Mutex` would hold the lock across the await, serializing all
147        // dispatch through the lock at 1 / rate_limit_delay — exactly the
148        // regression v0.26.7 tried to fix from the other direction.
149        //
150        // Instead each task locks briefly to *claim* its dispatch slot (the
151        // next monotonically-increasing deadline at `rate_limit_delay`
152        // spacing), then releases the lock and `sleep_until`s its assigned
153        // slot. With concurrency=N and rate=D, N tasks can be sleeping
154        // simultaneously on N distinct deadlines, and execution begins at
155        // each task's deadline — so global dispatch rate is 1/D while up
156        // to N tasks run their operation concurrently.
157        let limiter = if self.rate_limit_delay.is_zero() {
158            None
159        } else {
160            Some(Arc::new(Mutex::new(None::<TokioInstant>)))
161        };
162        let rate_limit_delay = self.rate_limit_delay;
163
164        let results: Vec<BulkResult> = stream::iter(operations)
165            .map(|op| {
166                let completed = completed.clone();
167                let progress = progress.as_ref();
168                let limiter = limiter.clone();
169                let whois_client = &self.whois_client;
170                let rdap_client = &self.rdap_client;
171                let dns_resolver = &self.dns_resolver;
172                let propagation_checker = &self.propagation_checker;
173                let smart_lookup = &self.smart_lookup;
174                let status_client = &self.status_client;
175                let availability_checker = &self.availability_checker;
176                let ssl_checker = &self.ssl_checker;
177
178                async move {
179                    // Rate-limited dispatch: claim our slot quickly under the
180                    // lock, then `sleep_until` outside the lock so multiple
181                    // tasks can be sleeping in parallel on their own slots.
182                    if let Some(limiter) = &limiter {
183                        let my_slot = {
184                            let mut next = limiter.lock().await;
185                            let now = TokioInstant::now();
186                            let slot = match *next {
187                                Some(prev) if prev > now => prev,
188                                _ => now,
189                            };
190                            *next = Some(slot + rate_limit_delay);
191                            slot
192                        };
193                        sleep_until(my_slot).await;
194                    }
195
196                    let start = std::time::Instant::now();
197                    let result = execute_operation(
198                        &op,
199                        &Clients {
200                            whois: whois_client,
201                            rdap: rdap_client,
202                            dns: dns_resolver,
203                            propagation: propagation_checker,
204                            lookup: smart_lookup,
205                            status: status_client,
206                            avail: availability_checker,
207                            ssl: ssl_checker,
208                        },
209                    )
210                    .await;
211                    let duration_ms = start.elapsed().as_millis() as u64;
212
213                    let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
214
215                    if let Some(progress) = progress {
216                        let desc = match &op {
217                            BulkOperation::Whois { domain }
218                            | BulkOperation::Rdap { domain }
219                            | BulkOperation::Dns { domain, .. }
220                            | BulkOperation::Propagation { domain, .. }
221                            | BulkOperation::Lookup { domain }
222                            | BulkOperation::Status { domain }
223                            | BulkOperation::Avail { domain }
224                            | BulkOperation::Info { domain }
225                            | BulkOperation::Ssl { domain } => domain.as_str(),
226                        };
227                        progress(count, total, desc);
228                    }
229
230                    match result {
231                        Ok(data) => BulkResult {
232                            operation: op,
233                            success: true,
234                            data: Some(data),
235                            error: None,
236                            duration_ms,
237                        },
238                        Err(e) => {
239                            debug!(error = %e, "Bulk operation failed");
240                            BulkResult {
241                                operation: op,
242                                success: false,
243                                data: None,
244                                error: Some(e.to_string()),
245                                duration_ms,
246                            }
247                        }
248                    }
249                }
250            })
251            .buffer_unordered(self.concurrency)
252            .collect()
253            .await;
254
255        let succeeded = results.iter().filter(|r| r.success).count();
256        let failed = results.iter().filter(|r| !r.success).count();
257        info!(
258            total = total,
259            succeeded = succeeded,
260            failed = failed,
261            "bulk operation completed"
262        );
263
264        results
265    }
266
267    pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
268        let operations = domains
269            .into_iter()
270            .map(|domain| BulkOperation::Whois { domain })
271            .collect();
272        self.execute(operations, None).await
273    }
274
275    pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
276        let operations = domains
277            .into_iter()
278            .map(|domain| BulkOperation::Rdap { domain })
279            .collect();
280        self.execute(operations, None).await
281    }
282
283    pub async fn execute_dns(
284        &self,
285        domains: Vec<String>,
286        record_type: RecordType,
287    ) -> Vec<BulkResult> {
288        let operations = domains
289            .into_iter()
290            .map(|domain| BulkOperation::Dns {
291                domain,
292                record_type,
293            })
294            .collect();
295        self.execute(operations, None).await
296    }
297
298    pub async fn execute_propagation(
299        &self,
300        domains: Vec<String>,
301        record_type: RecordType,
302    ) -> Vec<BulkResult> {
303        let operations = domains
304            .into_iter()
305            .map(|domain| BulkOperation::Propagation {
306                domain,
307                record_type,
308            })
309            .collect();
310        self.execute(operations, None).await
311    }
312
313    pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
314        let operations = domains
315            .into_iter()
316            .map(|domain| BulkOperation::Lookup { domain })
317            .collect();
318        self.execute(operations, None).await
319    }
320
321    pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
322        let operations = domains
323            .into_iter()
324            .map(|domain| BulkOperation::Status { domain })
325            .collect();
326        self.execute(operations, None).await
327    }
328
329    pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
330        let operations = domains
331            .into_iter()
332            .map(|domain| BulkOperation::Avail { domain })
333            .collect();
334        self.execute(operations, None).await
335    }
336
337    pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
338        let operations = domains
339            .into_iter()
340            .map(|domain| BulkOperation::Info { domain })
341            .collect();
342        self.execute(operations, None).await
343    }
344
345    pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
346        let operations = domains
347            .into_iter()
348            .map(|domain| BulkOperation::Ssl { domain })
349            .collect();
350        self.execute(operations, None).await
351    }
352}
353
354struct Clients<'a> {
355    whois: &'a WhoisClient,
356    rdap: &'a RdapClient,
357    dns: &'a DnsResolver,
358    propagation: &'a PropagationChecker,
359    lookup: &'a SmartLookup,
360    status: &'a StatusClient,
361    avail: &'a AvailabilityChecker,
362    ssl: &'a SslChecker,
363}
364
365async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
366    match op {
367        BulkOperation::Whois { domain } => {
368            let result = clients.whois.lookup(domain).await?;
369            Ok(BulkResultData::Whois(result))
370        }
371        BulkOperation::Rdap { domain } => {
372            let result = clients.rdap.lookup_domain(domain).await?;
373            Ok(BulkResultData::Rdap(Box::new(result)))
374        }
375        BulkOperation::Dns {
376            domain,
377            record_type,
378        } => {
379            let result = clients.dns.resolve(domain, *record_type, None).await?;
380            Ok(BulkResultData::Dns(result))
381        }
382        BulkOperation::Propagation {
383            domain,
384            record_type,
385        } => {
386            let result = clients.propagation.check(domain, *record_type).await?;
387            Ok(BulkResultData::Propagation(result))
388        }
389        BulkOperation::Lookup { domain } => {
390            let result = clients.lookup.lookup(domain).await?;
391            Ok(BulkResultData::Lookup(result))
392        }
393        BulkOperation::Status { domain } => {
394            let result = clients.status.check(domain).await?;
395            Ok(BulkResultData::Status(result))
396        }
397        BulkOperation::Avail { domain } => {
398            let result = clients.avail.check(domain).await?;
399            Ok(BulkResultData::Avail(result))
400        }
401        BulkOperation::Info { domain } => {
402            let result = clients.lookup.lookup(domain).await?;
403            Ok(BulkResultData::Info(
404                crate::domain_info::DomainInfo::from_lookup_result(&result),
405            ))
406        }
407        BulkOperation::Ssl { domain } => {
408            let result = clients.ssl.check(domain).await?;
409            Ok(BulkResultData::Ssl(result))
410        }
411    }
412}
413
414/// Keywords commonly used as CSV header labels for a domain column.
415const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
416
417/// Returns true if `first` looks like a CSV header row rather than a real domain.
418///
419/// Heuristic: take the first comma-delimited column and trim it. Real domains
420/// always contain a `.` (e.g., "example.com", "domain.com"); a bare keyword
421/// like "domain" or "hostname" does not. Only treat the row as a header when
422/// the first column has no dot AND matches a known header keyword.
423fn is_csv_header_row(first: &str) -> bool {
424    let first_col = first.split(',').next().unwrap_or(first).trim();
425    // Real domains always contain a dot; a bare keyword does not.
426    if first_col.contains('.') {
427        return false;
428    }
429    let label = first_col.to_lowercase();
430    HEADER_KEYWORDS.contains(&label.as_str())
431}
432
433pub fn parse_domains_from_file(content: &str) -> Vec<String> {
434    let mut domains: Vec<String> = content
435        .lines()
436        .map(|line| line.trim())
437        .filter(|line| !line.is_empty() && !line.starts_with('#'))
438        .map(|line| {
439            // Handle CSV format (take first column)
440            line.split(',').next().unwrap_or(line).trim().to_string()
441        })
442        .filter(|domain| domain.contains('.'))
443        .collect();
444
445    // A bare-keyword CSV header like "domain" / "hostname" has no dot and is
446    // already dropped by the filter above. `is_csv_header_row` additionally
447    // covers the edge case where the first surviving entry is a dotted header
448    // (rare, and the current heuristic treats such values as real domains —
449    // see `domain_dot_com_is_not_dropped_as_header`). The guard below is a
450    // belt-and-suspenders check that stays correct if the upstream filter
451    // ever changes.
452    if domains
453        .first()
454        .map(|d| is_csv_header_row(d))
455        .unwrap_or(false)
456    {
457        domains.remove(0);
458    }
459
460    domains
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_parse_domains_from_file() {
469        let content = r#"
470# This is a comment
471example1.com
472google2.com
473  whitespace3.com
474invalid
475csv,format,example.org
476"#;
477
478        let domains = parse_domains_from_file(content);
479        assert_eq!(domains.len(), 3);
480        assert!(domains.contains(&"example1.com".to_string()));
481        assert!(domains.contains(&"google2.com".to_string()));
482        assert!(domains.contains(&"whitespace3.com".to_string()));
483        // "invalid" and "csv" are filtered out because they don't contain a dot
484    }
485
486    #[test]
487    fn test_parse_domains_skip_bare_header() {
488        // Bare-keyword header ("domain") is dropped by the dot filter.
489        let content = "domain\nexample.com\n";
490        let domains = parse_domains_from_file(content);
491        assert_eq!(domains, vec!["example.com"]);
492    }
493
494    #[test]
495    fn test_parse_domains_multi_column_csv_header_dropped() {
496        // First column is a bare keyword → whole header row has no dot in col 1
497        // and is dropped by the dot filter.
498        let content = "domain,status\ngoogle.com,live\n";
499        let domains = parse_domains_from_file(content);
500        assert_eq!(domains, vec!["google.com"]);
501        assert!(!domains.contains(&"domain".to_string()));
502    }
503
504    #[test]
505    fn first_domain_with_no_digits_is_kept() {
506        // Regression: previous heuristic dropped the first entry when it
507        // had no ASCII digits, losing "google.com" / "amazon.com" / ...
508        let input = "google.com\namazon.com\n";
509        let result = parse_domains_from_file(input);
510        assert_eq!(result, vec!["google.com", "amazon.com"]);
511    }
512
513    #[test]
514    fn header_row_named_hostname_is_dropped() {
515        let input = "hostname\nexample.com\n";
516        let result = parse_domains_from_file(input);
517        assert_eq!(result, vec!["example.com"]);
518    }
519
520    #[test]
521    fn domain_dot_com_is_not_dropped_as_header() {
522        // "domain.com" is a real domain, not a header, because it has a dot.
523        let input = "domain.com\nexample.com\n";
524        let result = parse_domains_from_file(input);
525        assert_eq!(result, vec!["domain.com", "example.com"]);
526    }
527
528    #[test]
529    fn is_csv_header_row_detects_bare_keywords() {
530        assert!(is_csv_header_row("domain"));
531        assert!(is_csv_header_row("Hostname"));
532        assert!(is_csv_header_row("URL"));
533        assert!(is_csv_header_row("domain,status,notes"));
534        assert!(is_csv_header_row("  host  "));
535    }
536
537    #[test]
538    fn is_csv_header_row_rejects_dotted_values() {
539        assert!(!is_csv_header_row("domain.com"));
540        assert!(!is_csv_header_row("google.com"));
541        assert!(!is_csv_header_row("host.name"));
542    }
543
544    #[test]
545    fn is_csv_header_row_rejects_non_keyword() {
546        assert!(!is_csv_header_row("example"));
547        assert!(!is_csv_header_row("mydata"));
548    }
549
550    /// Regression test for the v0.26.7 rate-limiter regression. The
551    /// previous implementation held a `tokio::sync::Mutex` guard across
552    /// `Interval::tick().await`, fully serializing dispatch through the
553    /// lock. We need a behavioural assertion (not just code review) that
554    /// catches that pattern if it ever returns.
555    ///
556    /// With concurrency 5, rate_limit 50ms, and 4 hermetic-failure tasks:
557    /// - Dispatch should be spaced ~50ms apart (slot-claim semantics).
558    /// - All 4 tasks should *finish* in well under 4 * (per-op cost),
559    ///   i.e. they overlap on the execution side.
560    /// - Total wall time should be approximately the longest single op
561    ///   plus the cumulative slot delays (~150ms).
562    ///
563    /// The previous (broken) implementation would have produced wall
564    /// time = sum-of-per-op-cost because dispatch was serialized.
565    #[tokio::test]
566    async fn rate_limiter_dispatches_in_parallel_not_serialized() {
567        use std::time::Instant;
568        let executor = BulkExecutor::new()
569            .with_concurrency(5)
570            .with_rate_limit(Duration::from_millis(50));
571
572        // Use unresolvable `.invalid` hosts — they fail fast at DNS, well
573        // under the limiter's slot spacing. If dispatch IS rate-limited
574        // in parallel, all 4 finish in close to 3 * 50ms = 150ms (plus
575        // per-op DNS-fail cost, typically tens of ms). If dispatch was
576        // serialized through a lock, each task waits for the previous
577        // to FINISH before starting — total would be much higher.
578        let start = Instant::now();
579        let domains = vec![
580            "seer-rl-1.invalid".to_string(),
581            "seer-rl-2.invalid".to_string(),
582            "seer-rl-3.invalid".to_string(),
583            "seer-rl-4.invalid".to_string(),
584        ];
585        let results = executor.execute_ssl(domains).await;
586        let elapsed = start.elapsed();
587
588        assert_eq!(results.len(), 4);
589        // Generous upper bound — 2s is far above the worst legitimate
590        // wall time (4*50ms slots + 4*100ms DNS-fail = ~600ms) but well
591        // below the serialised path (would be 4 * per-op-cost minimum).
592        assert!(
593            elapsed < Duration::from_secs(2),
594            "rate-limited dispatch should run in parallel; took {:?}",
595            elapsed
596        );
597    }
598
599    #[tokio::test]
600    async fn execute_ssl_failure_path_for_unresolvable_host() {
601        // Verifies the SSL bulk arm wires correctly: an unresolvable hostname
602        // must surface as success=false with a non-empty error string. Uses
603        // the IETF-reserved `.invalid` TLD so this is hermetic.
604        let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
605        let results = executor
606            .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
607            .await;
608        assert_eq!(results.len(), 1);
609        let r = &results[0];
610        assert!(!r.success, "expected failure, got success");
611        assert!(r.data.is_none(), "expected no data on failure");
612        let err = r.error.as_deref().unwrap_or("");
613        assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
614        assert!(
615            matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
616            "expected Ssl variant, got {:?}",
617            r.operation
618        );
619    }
620
621    #[tokio::test]
622    #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
623    async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
624        let executor = BulkExecutor::new();
625        let results = executor
626            .execute_ssl(vec!["cloudflare.com".to_string()])
627            .await;
628        assert_eq!(results.len(), 1);
629        let r = &results[0];
630        assert!(r.success, "expected success, got error: {:?}", r.error);
631        let Some(BulkResultData::Ssl(ref report)) = r.data else {
632            panic!("expected Ssl data, got {:?}", r.data);
633        };
634        assert!(!report.chain.is_empty(), "chain must not be empty");
635        assert!(report.is_valid, "cloudflare leaf cert should be valid");
636        assert!(
637            report.days_until_expiry > 0,
638            "cert should still be valid in the future, got {} days",
639            report.days_until_expiry
640        );
641        assert!(
642            !report.san_names.is_empty(),
643            "cloudflare cert should have SAN entries"
644        );
645    }
646}