Skip to main content

seer_core/bulk/
executor.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::time::sleep;
8use tracing::{debug, info, instrument, warn};
9
10use crate::availability::{AvailabilityChecker, AvailabilityResult};
11use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
12use crate::error::Result;
13use crate::lookup::{LookupResult, SmartLookup};
14use crate::rdap::{RdapClient, RdapResponse};
15use crate::status::{StatusClient, StatusResponse};
16use crate::whois::{WhoisClient, WhoisResponse};
17
18pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
19
20/// A single operation to perform in a bulk execution batch.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23pub enum BulkOperation {
24    Whois {
25        domain: String,
26    },
27    Rdap {
28        domain: String,
29    },
30    Dns {
31        domain: String,
32        record_type: RecordType,
33    },
34    Propagation {
35        domain: String,
36        record_type: RecordType,
37    },
38    Lookup {
39        domain: String,
40    },
41    Status {
42        domain: String,
43    },
44    Avail {
45        domain: String,
46    },
47    Info {
48        domain: String,
49    },
50}
51
52/// The data returned from a bulk operation (varies by operation type).
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
55pub enum BulkResultData {
56    Whois(WhoisResponse),
57    Rdap(Box<RdapResponse>),
58    Dns(Vec<DnsRecord>),
59    Propagation(PropagationResult),
60    Lookup(LookupResult),
61    Status(StatusResponse),
62    Avail(AvailabilityResult),
63    Info(crate::domain_info::DomainInfo),
64}
65
66/// Result of a single operation within a bulk execution.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct BulkResult {
69    pub operation: BulkOperation,
70    pub success: bool,
71    pub data: Option<BulkResultData>,
72    pub error: Option<String>,
73    pub duration_ms: u64,
74}
75
76/// Executes bulk operations concurrently with rate limiting.
77#[derive(Debug, Clone)]
78pub struct BulkExecutor {
79    concurrency: usize,
80    rate_limit_delay: Duration,
81    whois_client: WhoisClient,
82    rdap_client: RdapClient,
83    dns_resolver: DnsResolver,
84    propagation_checker: PropagationChecker,
85    smart_lookup: SmartLookup,
86    status_client: StatusClient,
87    availability_checker: AvailabilityChecker,
88}
89
90impl Default for BulkExecutor {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96impl BulkExecutor {
97    pub fn new() -> Self {
98        Self {
99            concurrency: 10,
100            rate_limit_delay: Duration::from_millis(100),
101            whois_client: WhoisClient::new(),
102            rdap_client: RdapClient::new(),
103            dns_resolver: DnsResolver::new(),
104            propagation_checker: PropagationChecker::new(),
105            smart_lookup: SmartLookup::new(),
106            status_client: StatusClient::new(),
107            availability_checker: AvailabilityChecker::new(),
108        }
109    }
110
111    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
112        self.concurrency = concurrency.clamp(1, 50);
113        self
114    }
115
116    pub fn with_rate_limit(mut self, delay: Duration) -> Self {
117        self.rate_limit_delay = delay;
118        self
119    }
120
121    #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
122    pub async fn execute(
123        &self,
124        operations: Vec<BulkOperation>,
125        progress: Option<ProgressCallback>,
126    ) -> Vec<BulkResult> {
127        let total = operations.len();
128        let completed = Arc::new(AtomicUsize::new(0));
129
130        info!("bulk operation started");
131        debug!(
132            total = total,
133            concurrency = self.concurrency,
134            "Starting bulk execution"
135        );
136
137        let results: Vec<BulkResult> = stream::iter(operations)
138            .map(|op| {
139                let completed = completed.clone();
140                let progress = progress.as_ref();
141                let rate_limit_delay = self.rate_limit_delay;
142                let whois_client = &self.whois_client;
143                let rdap_client = &self.rdap_client;
144                let dns_resolver = &self.dns_resolver;
145                let propagation_checker = &self.propagation_checker;
146                let smart_lookup = &self.smart_lookup;
147                let status_client = &self.status_client;
148                let availability_checker = &self.availability_checker;
149
150                async move {
151                    // Rate limiting delay
152                    if !rate_limit_delay.is_zero() {
153                        sleep(rate_limit_delay).await;
154                    }
155
156                    let start = std::time::Instant::now();
157                    let result = execute_operation(
158                        &op,
159                        &Clients {
160                            whois: whois_client,
161                            rdap: rdap_client,
162                            dns: dns_resolver,
163                            propagation: propagation_checker,
164                            lookup: smart_lookup,
165                            status: status_client,
166                            avail: availability_checker,
167                        },
168                    )
169                    .await;
170                    let duration_ms = start.elapsed().as_millis() as u64;
171
172                    let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
173
174                    if let Some(progress) = progress {
175                        let desc = match &op {
176                            BulkOperation::Whois { domain }
177                            | BulkOperation::Rdap { domain }
178                            | BulkOperation::Dns { domain, .. }
179                            | BulkOperation::Propagation { domain, .. }
180                            | BulkOperation::Lookup { domain }
181                            | BulkOperation::Status { domain }
182                            | BulkOperation::Avail { domain }
183                            | BulkOperation::Info { domain } => domain.as_str(),
184                        };
185                        progress(count, total, desc);
186                    }
187
188                    match result {
189                        Ok(data) => BulkResult {
190                            operation: op,
191                            success: true,
192                            data: Some(data),
193                            error: None,
194                            duration_ms,
195                        },
196                        Err(e) => {
197                            warn!(error = %e, "Bulk operation failed");
198                            BulkResult {
199                                operation: op,
200                                success: false,
201                                data: None,
202                                error: Some(e.to_string()),
203                                duration_ms,
204                            }
205                        }
206                    }
207                }
208            })
209            .buffer_unordered(self.concurrency)
210            .collect()
211            .await;
212
213        let succeeded = results.iter().filter(|r| r.success).count();
214        let failed = results.iter().filter(|r| !r.success).count();
215        info!(
216            total = total,
217            succeeded = succeeded,
218            failed = failed,
219            "bulk operation completed"
220        );
221
222        results
223    }
224
225    pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
226        let operations = domains
227            .into_iter()
228            .map(|domain| BulkOperation::Whois { domain })
229            .collect();
230        self.execute(operations, None).await
231    }
232
233    pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
234        let operations = domains
235            .into_iter()
236            .map(|domain| BulkOperation::Rdap { domain })
237            .collect();
238        self.execute(operations, None).await
239    }
240
241    pub async fn execute_dns(
242        &self,
243        domains: Vec<String>,
244        record_type: RecordType,
245    ) -> Vec<BulkResult> {
246        let operations = domains
247            .into_iter()
248            .map(|domain| BulkOperation::Dns {
249                domain,
250                record_type,
251            })
252            .collect();
253        self.execute(operations, None).await
254    }
255
256    pub async fn execute_propagation(
257        &self,
258        domains: Vec<String>,
259        record_type: RecordType,
260    ) -> Vec<BulkResult> {
261        let operations = domains
262            .into_iter()
263            .map(|domain| BulkOperation::Propagation {
264                domain,
265                record_type,
266            })
267            .collect();
268        self.execute(operations, None).await
269    }
270
271    pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
272        let operations = domains
273            .into_iter()
274            .map(|domain| BulkOperation::Lookup { domain })
275            .collect();
276        self.execute(operations, None).await
277    }
278
279    pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
280        let operations = domains
281            .into_iter()
282            .map(|domain| BulkOperation::Status { domain })
283            .collect();
284        self.execute(operations, None).await
285    }
286
287    pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
288        let operations = domains
289            .into_iter()
290            .map(|domain| BulkOperation::Avail { domain })
291            .collect();
292        self.execute(operations, None).await
293    }
294
295    pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
296        let operations = domains
297            .into_iter()
298            .map(|domain| BulkOperation::Info { domain })
299            .collect();
300        self.execute(operations, None).await
301    }
302}
303
304struct Clients<'a> {
305    whois: &'a WhoisClient,
306    rdap: &'a RdapClient,
307    dns: &'a DnsResolver,
308    propagation: &'a PropagationChecker,
309    lookup: &'a SmartLookup,
310    status: &'a StatusClient,
311    avail: &'a AvailabilityChecker,
312}
313
314async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
315    match op {
316        BulkOperation::Whois { domain } => {
317            let result = clients.whois.lookup(domain).await?;
318            Ok(BulkResultData::Whois(result))
319        }
320        BulkOperation::Rdap { domain } => {
321            let result = clients.rdap.lookup_domain(domain).await?;
322            Ok(BulkResultData::Rdap(Box::new(result)))
323        }
324        BulkOperation::Dns {
325            domain,
326            record_type,
327        } => {
328            let result = clients.dns.resolve(domain, *record_type, None).await?;
329            Ok(BulkResultData::Dns(result))
330        }
331        BulkOperation::Propagation {
332            domain,
333            record_type,
334        } => {
335            let result = clients.propagation.check(domain, *record_type).await?;
336            Ok(BulkResultData::Propagation(result))
337        }
338        BulkOperation::Lookup { domain } => {
339            let result = clients.lookup.lookup(domain).await?;
340            Ok(BulkResultData::Lookup(result))
341        }
342        BulkOperation::Status { domain } => {
343            let result = clients.status.check(domain).await?;
344            Ok(BulkResultData::Status(result))
345        }
346        BulkOperation::Avail { domain } => {
347            let result = clients.avail.check(domain).await?;
348            Ok(BulkResultData::Avail(result))
349        }
350        BulkOperation::Info { domain } => {
351            let result = clients.lookup.lookup(domain).await?;
352            Ok(BulkResultData::Info(
353                crate::domain_info::DomainInfo::from_lookup_result(&result),
354            ))
355        }
356    }
357}
358
359/// Keywords commonly used as CSV header labels for a domain column.
360const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
361
362/// Returns true if `first` looks like a CSV header row rather than a real domain.
363///
364/// Heuristic: take the first comma-delimited column and trim it. Real domains
365/// always contain a `.` (e.g., "example.com", "domain.com"); a bare keyword
366/// like "domain" or "hostname" does not. Only treat the row as a header when
367/// the first column has no dot AND matches a known header keyword.
368fn is_csv_header_row(first: &str) -> bool {
369    let first_col = first.split(',').next().unwrap_or(first).trim();
370    // Real domains always contain a dot; a bare keyword does not.
371    if first_col.contains('.') {
372        return false;
373    }
374    let label = first_col.to_lowercase();
375    HEADER_KEYWORDS.contains(&label.as_str())
376}
377
378pub fn parse_domains_from_file(content: &str) -> Vec<String> {
379    let mut domains: Vec<String> = content
380        .lines()
381        .map(|line| line.trim())
382        .filter(|line| !line.is_empty() && !line.starts_with('#'))
383        .map(|line| {
384            // Handle CSV format (take first column)
385            line.split(',').next().unwrap_or(line).trim().to_string()
386        })
387        .filter(|domain| domain.contains('.'))
388        .collect();
389
390    // A bare-keyword CSV header like "domain" / "hostname" has no dot and is
391    // already dropped by the filter above. `is_csv_header_row` additionally
392    // covers the edge case where the first surviving entry is a dotted header
393    // (rare, and the current heuristic treats such values as real domains —
394    // see `domain_dot_com_is_not_dropped_as_header`). The guard below is a
395    // belt-and-suspenders check that stays correct if the upstream filter
396    // ever changes.
397    if domains
398        .first()
399        .map(|d| is_csv_header_row(d))
400        .unwrap_or(false)
401    {
402        domains.remove(0);
403    }
404
405    domains
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_parse_domains_from_file() {
414        let content = r#"
415# This is a comment
416example1.com
417google2.com
418  whitespace3.com
419invalid
420csv,format,example.org
421"#;
422
423        let domains = parse_domains_from_file(content);
424        assert_eq!(domains.len(), 3);
425        assert!(domains.contains(&"example1.com".to_string()));
426        assert!(domains.contains(&"google2.com".to_string()));
427        assert!(domains.contains(&"whitespace3.com".to_string()));
428        // "invalid" and "csv" are filtered out because they don't contain a dot
429    }
430
431    #[test]
432    fn test_parse_domains_skip_bare_header() {
433        // Bare-keyword header ("domain") is dropped by the dot filter.
434        let content = "domain\nexample.com\n";
435        let domains = parse_domains_from_file(content);
436        assert_eq!(domains, vec!["example.com"]);
437    }
438
439    #[test]
440    fn test_parse_domains_multi_column_csv_header_dropped() {
441        // First column is a bare keyword → whole header row has no dot in col 1
442        // and is dropped by the dot filter.
443        let content = "domain,status\ngoogle.com,live\n";
444        let domains = parse_domains_from_file(content);
445        assert_eq!(domains, vec!["google.com"]);
446        assert!(!domains.contains(&"domain".to_string()));
447    }
448
449    #[test]
450    fn first_domain_with_no_digits_is_kept() {
451        // Regression: previous heuristic dropped the first entry when it
452        // had no ASCII digits, losing "google.com" / "amazon.com" / ...
453        let input = "google.com\namazon.com\n";
454        let result = parse_domains_from_file(input);
455        assert_eq!(result, vec!["google.com", "amazon.com"]);
456    }
457
458    #[test]
459    fn header_row_named_hostname_is_dropped() {
460        let input = "hostname\nexample.com\n";
461        let result = parse_domains_from_file(input);
462        assert_eq!(result, vec!["example.com"]);
463    }
464
465    #[test]
466    fn domain_dot_com_is_not_dropped_as_header() {
467        // "domain.com" is a real domain, not a header, because it has a dot.
468        let input = "domain.com\nexample.com\n";
469        let result = parse_domains_from_file(input);
470        assert_eq!(result, vec!["domain.com", "example.com"]);
471    }
472
473    #[test]
474    fn is_csv_header_row_detects_bare_keywords() {
475        assert!(is_csv_header_row("domain"));
476        assert!(is_csv_header_row("Hostname"));
477        assert!(is_csv_header_row("URL"));
478        assert!(is_csv_header_row("domain,status,notes"));
479        assert!(is_csv_header_row("  host  "));
480    }
481
482    #[test]
483    fn is_csv_header_row_rejects_dotted_values() {
484        assert!(!is_csv_header_row("domain.com"));
485        assert!(!is_csv_header_row("google.com"));
486        assert!(!is_csv_header_row("host.name"));
487    }
488
489    #[test]
490    fn is_csv_header_row_rejects_non_keyword() {
491        assert!(!is_csv_header_row("example"));
492        assert!(!is_csv_header_row("mydata"));
493    }
494}