Skip to main content

seer_core/bulk/
executor.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Mutex;
8use tokio::time::{sleep_until, Instant as TokioInstant};
9use tracing::{debug, info, instrument};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::ssl::{SslChecker, SslReport};
17use crate::status::{StatusClient, StatusResponse};
18use crate::whois::{WhoisClient, WhoisResponse};
19
20pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
21
22/// A single operation to perform in a bulk execution batch.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum BulkOperation {
26    Whois {
27        domain: String,
28    },
29    Rdap {
30        domain: String,
31    },
32    Dns {
33        domain: String,
34        record_type: RecordType,
35    },
36    Propagation {
37        domain: String,
38        record_type: RecordType,
39    },
40    Lookup {
41        domain: String,
42    },
43    Status {
44        domain: String,
45    },
46    Avail {
47        domain: String,
48    },
49    Info {
50        domain: String,
51    },
52    Ssl {
53        domain: String,
54    },
55}
56
57/// The data returned from a bulk operation (varies by operation type).
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
60pub enum BulkResultData {
61    Whois(WhoisResponse),
62    Rdap(Box<RdapResponse>),
63    Dns(Vec<DnsRecord>),
64    Propagation(PropagationResult),
65    Lookup(LookupResult),
66    Status(StatusResponse),
67    Avail(AvailabilityResult),
68    Info(crate::domain_info::DomainInfo),
69    Ssl(SslReport),
70}
71
72/// Result of a single operation within a bulk execution.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BulkResult {
75    pub operation: BulkOperation,
76    pub success: bool,
77    pub data: Option<BulkResultData>,
78    pub error: Option<String>,
79    pub duration_ms: u64,
80}
81
82/// Executes bulk operations concurrently with rate limiting.
83#[derive(Debug, Clone)]
84pub struct BulkExecutor {
85    concurrency: usize,
86    rate_limit_delay: Duration,
87    whois_client: WhoisClient,
88    rdap_client: RdapClient,
89    dns_resolver: DnsResolver,
90    propagation_checker: PropagationChecker,
91    smart_lookup: SmartLookup,
92    status_client: StatusClient,
93    availability_checker: AvailabilityChecker,
94    ssl_checker: SslChecker,
95}
96
97impl Default for BulkExecutor {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl BulkExecutor {
104    pub fn new() -> Self {
105        Self {
106            concurrency: 10,
107            rate_limit_delay: Duration::from_millis(100),
108            whois_client: WhoisClient::new(),
109            rdap_client: RdapClient::new(),
110            dns_resolver: DnsResolver::new(),
111            propagation_checker: PropagationChecker::new(),
112            smart_lookup: SmartLookup::new(),
113            status_client: StatusClient::new(),
114            availability_checker: AvailabilityChecker::new(),
115            ssl_checker: SslChecker::new(),
116        }
117    }
118
119    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
120        self.concurrency = concurrency.clamp(1, 50);
121        self
122    }
123
124    pub fn with_rate_limit(mut self, delay: Duration) -> Self {
125        self.rate_limit_delay = delay;
126        self
127    }
128
129    #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
130    pub async fn execute(
131        &self,
132        operations: Vec<BulkOperation>,
133        progress: Option<ProgressCallback>,
134    ) -> Vec<BulkResult> {
135        let total = operations.len();
136        let completed = Arc::new(AtomicUsize::new(0));
137
138        info!("bulk operation started");
139        debug!(
140            total = total,
141            concurrency = self.concurrency,
142            "Starting bulk execution"
143        );
144
145        // Per-batch rate limiter. The naive `interval(...).tick().await` inside
146        // a `Mutex` would hold the lock across the await, serializing all
147        // dispatch through the lock at 1 / rate_limit_delay — exactly the
148        // regression v0.26.7 tried to fix from the other direction.
149        //
150        // Instead each task locks briefly to *claim* its dispatch slot (the
151        // next monotonically-increasing deadline at `rate_limit_delay`
152        // spacing), then releases the lock and `sleep_until`s its assigned
153        // slot. With concurrency=N and rate=D, N tasks can be sleeping
154        // simultaneously on N distinct deadlines, and execution begins at
155        // each task's deadline — so global dispatch rate is 1/D while up
156        // to N tasks run their operation concurrently.
157        let limiter = if self.rate_limit_delay.is_zero() {
158            None
159        } else {
160            Some(Arc::new(Mutex::new(None::<TokioInstant>)))
161        };
162        let rate_limit_delay = self.rate_limit_delay;
163
164        let results: Vec<BulkResult> = stream::iter(operations)
165            .map(|op| {
166                let completed = completed.clone();
167                let progress = progress.as_ref();
168                let limiter = limiter.clone();
169                let whois_client = &self.whois_client;
170                let rdap_client = &self.rdap_client;
171                let dns_resolver = &self.dns_resolver;
172                let propagation_checker = &self.propagation_checker;
173                let smart_lookup = &self.smart_lookup;
174                let status_client = &self.status_client;
175                let availability_checker = &self.availability_checker;
176                let ssl_checker = &self.ssl_checker;
177
178                async move {
179                    // Rate-limited dispatch: claim our slot quickly under the
180                    // lock, then `sleep_until` outside the lock so multiple
181                    // tasks can be sleeping in parallel on their own slots.
182                    if let Some(limiter) = &limiter {
183                        let my_slot = {
184                            let mut next = limiter.lock().await;
185                            let now = TokioInstant::now();
186                            let slot = match *next {
187                                Some(prev) if prev > now => prev,
188                                _ => now,
189                            };
190                            *next = Some(slot + rate_limit_delay);
191                            slot
192                        };
193                        sleep_until(my_slot).await;
194                    }
195
196                    let start = std::time::Instant::now();
197                    let result = execute_operation(
198                        &op,
199                        &Clients {
200                            whois: whois_client,
201                            rdap: rdap_client,
202                            dns: dns_resolver,
203                            propagation: propagation_checker,
204                            lookup: smart_lookup,
205                            status: status_client,
206                            avail: availability_checker,
207                            ssl: ssl_checker,
208                        },
209                    )
210                    .await;
211                    let duration_ms = start.elapsed().as_millis() as u64;
212
213                    let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
214
215                    if let Some(progress) = progress {
216                        let desc = match &op {
217                            BulkOperation::Whois { domain }
218                            | BulkOperation::Rdap { domain }
219                            | BulkOperation::Dns { domain, .. }
220                            | BulkOperation::Propagation { domain, .. }
221                            | BulkOperation::Lookup { domain }
222                            | BulkOperation::Status { domain }
223                            | BulkOperation::Avail { domain }
224                            | BulkOperation::Info { domain }
225                            | BulkOperation::Ssl { domain } => domain.as_str(),
226                        };
227                        progress(count, total, desc);
228                    }
229
230                    match result {
231                        Ok(data) => BulkResult {
232                            operation: op,
233                            success: true,
234                            data: Some(data),
235                            error: None,
236                            duration_ms,
237                        },
238                        Err(e) => {
239                            debug!(error = %e, "Bulk operation failed");
240                            BulkResult {
241                                operation: op,
242                                success: false,
243                                data: None,
244                                // Store the sanitized form for external return;
245                                // full detail stays in the debug log above.
246                                error: Some(e.sanitized_message()),
247                                duration_ms,
248                            }
249                        }
250                    }
251                }
252            })
253            .buffer_unordered(self.concurrency)
254            .collect()
255            .await;
256
257        let succeeded = results.iter().filter(|r| r.success).count();
258        let failed = results.iter().filter(|r| !r.success).count();
259        info!(
260            total = total,
261            succeeded = succeeded,
262            failed = failed,
263            "bulk operation completed"
264        );
265
266        results
267    }
268
269    pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
270        let operations = domains
271            .into_iter()
272            .map(|domain| BulkOperation::Whois { domain })
273            .collect();
274        self.execute(operations, None).await
275    }
276
277    pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
278        let operations = domains
279            .into_iter()
280            .map(|domain| BulkOperation::Rdap { domain })
281            .collect();
282        self.execute(operations, None).await
283    }
284
285    pub async fn execute_dns(
286        &self,
287        domains: Vec<String>,
288        record_type: RecordType,
289    ) -> Vec<BulkResult> {
290        let operations = domains
291            .into_iter()
292            .map(|domain| BulkOperation::Dns {
293                domain,
294                record_type,
295            })
296            .collect();
297        self.execute(operations, None).await
298    }
299
300    pub async fn execute_propagation(
301        &self,
302        domains: Vec<String>,
303        record_type: RecordType,
304    ) -> Vec<BulkResult> {
305        let operations = domains
306            .into_iter()
307            .map(|domain| BulkOperation::Propagation {
308                domain,
309                record_type,
310            })
311            .collect();
312        self.execute(operations, None).await
313    }
314
315    pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
316        let operations = domains
317            .into_iter()
318            .map(|domain| BulkOperation::Lookup { domain })
319            .collect();
320        self.execute(operations, None).await
321    }
322
323    pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
324        let operations = domains
325            .into_iter()
326            .map(|domain| BulkOperation::Status { domain })
327            .collect();
328        self.execute(operations, None).await
329    }
330
331    pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
332        let operations = domains
333            .into_iter()
334            .map(|domain| BulkOperation::Avail { domain })
335            .collect();
336        self.execute(operations, None).await
337    }
338
339    pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
340        let operations = domains
341            .into_iter()
342            .map(|domain| BulkOperation::Info { domain })
343            .collect();
344        self.execute(operations, None).await
345    }
346
347    pub async fn execute_ssl(&self, domains: Vec<String>) -> Vec<BulkResult> {
348        let operations = domains
349            .into_iter()
350            .map(|domain| BulkOperation::Ssl { domain })
351            .collect();
352        self.execute(operations, None).await
353    }
354}
355
356struct Clients<'a> {
357    whois: &'a WhoisClient,
358    rdap: &'a RdapClient,
359    dns: &'a DnsResolver,
360    propagation: &'a PropagationChecker,
361    lookup: &'a SmartLookup,
362    status: &'a StatusClient,
363    avail: &'a AvailabilityChecker,
364    ssl: &'a SslChecker,
365}
366
367async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
368    match op {
369        BulkOperation::Whois { domain } => {
370            let result = clients.whois.lookup(domain).await?;
371            Ok(BulkResultData::Whois(result))
372        }
373        BulkOperation::Rdap { domain } => {
374            let result = clients.rdap.lookup_domain(domain).await?;
375            Ok(BulkResultData::Rdap(Box::new(result)))
376        }
377        BulkOperation::Dns {
378            domain,
379            record_type,
380        } => {
381            let result = clients.dns.resolve(domain, *record_type, None).await?;
382            Ok(BulkResultData::Dns(result))
383        }
384        BulkOperation::Propagation {
385            domain,
386            record_type,
387        } => {
388            let result = clients.propagation.check(domain, *record_type).await?;
389            Ok(BulkResultData::Propagation(result))
390        }
391        BulkOperation::Lookup { domain } => {
392            let result = clients.lookup.lookup(domain).await?;
393            Ok(BulkResultData::Lookup(result))
394        }
395        BulkOperation::Status { domain } => {
396            let result = clients.status.check(domain).await?;
397            Ok(BulkResultData::Status(result))
398        }
399        BulkOperation::Avail { domain } => {
400            let result = clients.avail.check(domain).await?;
401            Ok(BulkResultData::Avail(result))
402        }
403        BulkOperation::Info { domain } => {
404            let result = clients.lookup.lookup(domain).await?;
405            Ok(BulkResultData::Info(
406                crate::domain_info::DomainInfo::from_lookup_result(&result),
407            ))
408        }
409        BulkOperation::Ssl { domain } => {
410            let result = clients.ssl.check(domain).await?;
411            Ok(BulkResultData::Ssl(result))
412        }
413    }
414}
415
416/// Keywords commonly used as CSV header labels for a domain column.
417const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
418
419/// Returns true if `first` looks like a CSV header row rather than a real domain.
420///
421/// Heuristic: take the first comma-delimited column and trim it. Real domains
422/// always contain a `.` (e.g., "example.com", "domain.com"); a bare keyword
423/// like "domain" or "hostname" does not. Only treat the row as a header when
424/// the first column has no dot AND matches a known header keyword.
425fn is_csv_header_row(first: &str) -> bool {
426    let first_col = first.split(',').next().unwrap_or(first).trim();
427    // Real domains always contain a dot; a bare keyword does not.
428    if first_col.contains('.') {
429        return false;
430    }
431    let label = first_col.to_lowercase();
432    HEADER_KEYWORDS.contains(&label.as_str())
433}
434
435pub fn parse_domains_from_file(content: &str) -> Vec<String> {
436    let mut domains: Vec<String> = content
437        .lines()
438        .map(|line| line.trim())
439        .filter(|line| !line.is_empty() && !line.starts_with('#'))
440        .map(|line| {
441            // Handle CSV format (take first column)
442            line.split(',').next().unwrap_or(line).trim().to_string()
443        })
444        .filter(|domain| domain.contains('.'))
445        .collect();
446
447    // A bare-keyword CSV header like "domain" / "hostname" has no dot and is
448    // already dropped by the filter above. `is_csv_header_row` additionally
449    // covers the edge case where the first surviving entry is a dotted header
450    // (rare, and the current heuristic treats such values as real domains —
451    // see `domain_dot_com_is_not_dropped_as_header`). The guard below is a
452    // belt-and-suspenders check that stays correct if the upstream filter
453    // ever changes.
454    if domains
455        .first()
456        .map(|d| is_csv_header_row(d))
457        .unwrap_or(false)
458    {
459        domains.remove(0);
460    }
461
462    domains
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    #[test]
470    fn test_parse_domains_from_file() {
471        let content = r#"
472# This is a comment
473example1.com
474google2.com
475  whitespace3.com
476invalid
477csv,format,example.org
478"#;
479
480        let domains = parse_domains_from_file(content);
481        assert_eq!(domains.len(), 3);
482        assert!(domains.contains(&"example1.com".to_string()));
483        assert!(domains.contains(&"google2.com".to_string()));
484        assert!(domains.contains(&"whitespace3.com".to_string()));
485        // "invalid" and "csv" are filtered out because they don't contain a dot
486    }
487
488    #[test]
489    fn test_parse_domains_skip_bare_header() {
490        // Bare-keyword header ("domain") is dropped by the dot filter.
491        let content = "domain\nexample.com\n";
492        let domains = parse_domains_from_file(content);
493        assert_eq!(domains, vec!["example.com"]);
494    }
495
496    #[test]
497    fn test_parse_domains_multi_column_csv_header_dropped() {
498        // First column is a bare keyword → whole header row has no dot in col 1
499        // and is dropped by the dot filter.
500        let content = "domain,status\ngoogle.com,live\n";
501        let domains = parse_domains_from_file(content);
502        assert_eq!(domains, vec!["google.com"]);
503        assert!(!domains.contains(&"domain".to_string()));
504    }
505
506    #[test]
507    fn first_domain_with_no_digits_is_kept() {
508        // Regression: previous heuristic dropped the first entry when it
509        // had no ASCII digits, losing "google.com" / "amazon.com" / ...
510        let input = "google.com\namazon.com\n";
511        let result = parse_domains_from_file(input);
512        assert_eq!(result, vec!["google.com", "amazon.com"]);
513    }
514
515    #[test]
516    fn header_row_named_hostname_is_dropped() {
517        let input = "hostname\nexample.com\n";
518        let result = parse_domains_from_file(input);
519        assert_eq!(result, vec!["example.com"]);
520    }
521
522    #[test]
523    fn domain_dot_com_is_not_dropped_as_header() {
524        // "domain.com" is a real domain, not a header, because it has a dot.
525        let input = "domain.com\nexample.com\n";
526        let result = parse_domains_from_file(input);
527        assert_eq!(result, vec!["domain.com", "example.com"]);
528    }
529
530    #[test]
531    fn is_csv_header_row_detects_bare_keywords() {
532        assert!(is_csv_header_row("domain"));
533        assert!(is_csv_header_row("Hostname"));
534        assert!(is_csv_header_row("URL"));
535        assert!(is_csv_header_row("domain,status,notes"));
536        assert!(is_csv_header_row("  host  "));
537    }
538
539    #[test]
540    fn is_csv_header_row_rejects_dotted_values() {
541        assert!(!is_csv_header_row("domain.com"));
542        assert!(!is_csv_header_row("google.com"));
543        assert!(!is_csv_header_row("host.name"));
544    }
545
546    #[test]
547    fn is_csv_header_row_rejects_non_keyword() {
548        assert!(!is_csv_header_row("example"));
549        assert!(!is_csv_header_row("mydata"));
550    }
551
552    /// Regression test for the v0.26.7 rate-limiter regression. The
553    /// previous implementation held a `tokio::sync::Mutex` guard across
554    /// `Interval::tick().await`, fully serializing dispatch through the
555    /// lock. We need a behavioural assertion (not just code review) that
556    /// catches that pattern if it ever returns.
557    ///
558    /// With concurrency 5, rate_limit 50ms, and 4 hermetic-failure tasks:
559    /// - Dispatch should be spaced ~50ms apart (slot-claim semantics).
560    /// - All 4 tasks should *finish* in well under 4 * (per-op cost),
561    ///   i.e. they overlap on the execution side.
562    /// - Total wall time should be approximately the longest single op
563    ///   plus the cumulative slot delays (~150ms).
564    ///
565    /// The previous (broken) implementation would have produced wall
566    /// time = sum-of-per-op-cost because dispatch was serialized.
567    #[tokio::test]
568    async fn rate_limiter_dispatches_in_parallel_not_serialized() {
569        use std::time::Instant;
570        let executor = BulkExecutor::new()
571            .with_concurrency(5)
572            .with_rate_limit(Duration::from_millis(50));
573
574        // Use unresolvable `.invalid` hosts — they fail fast at DNS, well
575        // under the limiter's slot spacing. If dispatch IS rate-limited
576        // in parallel, all 4 finish in close to 3 * 50ms = 150ms (plus
577        // per-op DNS-fail cost, typically tens of ms). If dispatch was
578        // serialized through a lock, each task waits for the previous
579        // to FINISH before starting — total would be much higher.
580        let start = Instant::now();
581        let domains = vec![
582            "seer-rl-1.invalid".to_string(),
583            "seer-rl-2.invalid".to_string(),
584            "seer-rl-3.invalid".to_string(),
585            "seer-rl-4.invalid".to_string(),
586        ];
587        let results = executor.execute_ssl(domains).await;
588        let elapsed = start.elapsed();
589
590        assert_eq!(results.len(), 4);
591        // Generous upper bound — 2s is far above the worst legitimate
592        // wall time (4*50ms slots + 4*100ms DNS-fail = ~600ms) but well
593        // below the serialised path (would be 4 * per-op-cost minimum).
594        assert!(
595            elapsed < Duration::from_secs(2),
596            "rate-limited dispatch should run in parallel; took {:?}",
597            elapsed
598        );
599    }
600
601    #[tokio::test]
602    async fn execute_ssl_failure_path_for_unresolvable_host() {
603        // Verifies the SSL bulk arm wires correctly: an unresolvable hostname
604        // must surface as success=false with a non-empty error string. Uses
605        // the IETF-reserved `.invalid` TLD so this is hermetic.
606        let executor = BulkExecutor::new().with_rate_limit(Duration::ZERO);
607        let results = executor
608            .execute_ssl(vec!["seer-bulk-ssl-test.invalid".to_string()])
609            .await;
610        assert_eq!(results.len(), 1);
611        let r = &results[0];
612        assert!(!r.success, "expected failure, got success");
613        assert!(r.data.is_none(), "expected no data on failure");
614        let err = r.error.as_deref().unwrap_or("");
615        assert!(!err.is_empty(), "expected non-empty error, got: {:?}", err);
616        assert!(
617            matches!(r.operation, BulkOperation::Ssl { ref domain } if domain == "seer-bulk-ssl-test.invalid"),
618            "expected Ssl variant, got {:?}",
619            r.operation
620        );
621    }
622
623    #[tokio::test]
624    #[ignore = "live network: hits cloudflare.com:443; run with --ignored"]
625    async fn execute_ssl_live_cloudflare_has_non_empty_chain() {
626        let executor = BulkExecutor::new();
627        let results = executor
628            .execute_ssl(vec!["cloudflare.com".to_string()])
629            .await;
630        assert_eq!(results.len(), 1);
631        let r = &results[0];
632        assert!(r.success, "expected success, got error: {:?}", r.error);
633        let Some(BulkResultData::Ssl(ref report)) = r.data else {
634            panic!("expected Ssl data, got {:?}", r.data);
635        };
636        assert!(!report.chain.is_empty(), "chain must not be empty");
637        assert!(report.is_valid, "cloudflare leaf cert should be valid");
638        assert!(
639            report.days_until_expiry > 0,
640            "cert should still be valid in the future, got {} days",
641            report.days_until_expiry
642        );
643        assert!(
644            !report.san_names.is_empty(),
645            "cloudflare cert should have SAN entries"
646        );
647    }
648}