Skip to main content

seer_core/bulk/
executor.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Semaphore;
8use tokio::time::sleep;
9use tracing::{debug, info, instrument, warn};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::status::{StatusClient, StatusResponse};
17use crate::whois::{WhoisClient, WhoisResponse};
18
19pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
20
21/// A single operation to perform in a bulk execution batch.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum BulkOperation {
25    Whois {
26        domain: String,
27    },
28    Rdap {
29        domain: String,
30    },
31    Dns {
32        domain: String,
33        record_type: RecordType,
34    },
35    Propagation {
36        domain: String,
37        record_type: RecordType,
38    },
39    Lookup {
40        domain: String,
41    },
42    Status {
43        domain: String,
44    },
45    Avail {
46        domain: String,
47    },
48    Info {
49        domain: String,
50    },
51}
52
53/// The data returned from a bulk operation (varies by operation type).
54#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
56pub enum BulkResultData {
57    Whois(WhoisResponse),
58    Rdap(Box<RdapResponse>),
59    Dns(Vec<DnsRecord>),
60    Propagation(PropagationResult),
61    Lookup(LookupResult),
62    Status(StatusResponse),
63    Avail(AvailabilityResult),
64    Info(crate::domain_info::DomainInfo),
65}
66
67/// Result of a single operation within a bulk execution.
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct BulkResult {
70    pub operation: BulkOperation,
71    pub success: bool,
72    pub data: Option<BulkResultData>,
73    pub error: Option<String>,
74    pub duration_ms: u64,
75}
76
77/// Executes bulk operations concurrently with rate limiting.
78#[derive(Debug, Clone)]
79pub struct BulkExecutor {
80    concurrency: usize,
81    rate_limit_delay: Duration,
82    whois_client: WhoisClient,
83    rdap_client: RdapClient,
84    dns_resolver: DnsResolver,
85    propagation_checker: PropagationChecker,
86    smart_lookup: SmartLookup,
87    status_client: StatusClient,
88    availability_checker: AvailabilityChecker,
89}
90
91impl Default for BulkExecutor {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl BulkExecutor {
98    pub fn new() -> Self {
99        Self {
100            concurrency: 10,
101            rate_limit_delay: Duration::from_millis(100),
102            whois_client: WhoisClient::new(),
103            rdap_client: RdapClient::new(),
104            dns_resolver: DnsResolver::new(),
105            propagation_checker: PropagationChecker::new(),
106            smart_lookup: SmartLookup::new(),
107            status_client: StatusClient::new(),
108            availability_checker: AvailabilityChecker::new(),
109        }
110    }
111
112    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
113        self.concurrency = concurrency.clamp(1, 50);
114        self
115    }
116
117    pub fn with_rate_limit(mut self, delay: Duration) -> Self {
118        self.rate_limit_delay = delay;
119        self
120    }
121
122    #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
123    pub async fn execute(
124        &self,
125        operations: Vec<BulkOperation>,
126        progress: Option<ProgressCallback>,
127    ) -> Vec<BulkResult> {
128        let total = operations.len();
129        let completed = Arc::new(AtomicUsize::new(0));
130        let semaphore = Arc::new(Semaphore::new(self.concurrency));
131
132        info!("bulk operation started");
133        debug!(
134            total = total,
135            concurrency = self.concurrency,
136            "Starting bulk execution"
137        );
138
139        let results: Vec<BulkResult> = stream::iter(operations)
140            .map(|op| {
141                let semaphore = semaphore.clone();
142                let completed = completed.clone();
143                let progress = progress.as_ref();
144                let rate_limit_delay = self.rate_limit_delay;
145                let whois_client = &self.whois_client;
146                let rdap_client = &self.rdap_client;
147                let dns_resolver = &self.dns_resolver;
148                let propagation_checker = &self.propagation_checker;
149                let smart_lookup = &self.smart_lookup;
150                let status_client = &self.status_client;
151                let availability_checker = &self.availability_checker;
152
153                async move {
154                    let Ok(_permit) = semaphore.acquire().await else {
155                        return BulkResult {
156                            operation: op,
157                            success: false,
158                            data: None,
159                            error: Some("Operation cancelled".to_string()),
160                            duration_ms: 0,
161                        };
162                    };
163
164                    // Rate limiting delay
165                    if !rate_limit_delay.is_zero() {
166                        sleep(rate_limit_delay).await;
167                    }
168
169                    let start = std::time::Instant::now();
170                    let result = execute_operation(
171                        &op,
172                        &Clients {
173                            whois: whois_client,
174                            rdap: rdap_client,
175                            dns: dns_resolver,
176                            propagation: propagation_checker,
177                            lookup: smart_lookup,
178                            status: status_client,
179                            avail: availability_checker,
180                        },
181                    )
182                    .await;
183                    let duration_ms = start.elapsed().as_millis() as u64;
184
185                    let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
186
187                    if let Some(progress) = progress {
188                        let desc = match &op {
189                            BulkOperation::Whois { domain }
190                            | BulkOperation::Rdap { domain }
191                            | BulkOperation::Dns { domain, .. }
192                            | BulkOperation::Propagation { domain, .. }
193                            | BulkOperation::Lookup { domain }
194                            | BulkOperation::Status { domain }
195                            | BulkOperation::Avail { domain }
196                            | BulkOperation::Info { domain } => domain.as_str(),
197                        };
198                        progress(count, total, desc);
199                    }
200
201                    match result {
202                        Ok(data) => BulkResult {
203                            operation: op,
204                            success: true,
205                            data: Some(data),
206                            error: None,
207                            duration_ms,
208                        },
209                        Err(e) => {
210                            warn!(error = %e, "Bulk operation failed");
211                            BulkResult {
212                                operation: op,
213                                success: false,
214                                data: None,
215                                error: Some(e.to_string()),
216                                duration_ms,
217                            }
218                        }
219                    }
220                }
221            })
222            .buffer_unordered(self.concurrency)
223            .collect()
224            .await;
225
226        let succeeded = results.iter().filter(|r| r.success).count();
227        let failed = results.iter().filter(|r| !r.success).count();
228        info!(
229            total = total,
230            succeeded = succeeded,
231            failed = failed,
232            "bulk operation completed"
233        );
234
235        results
236    }
237
238    pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
239        let operations = domains
240            .into_iter()
241            .map(|domain| BulkOperation::Whois { domain })
242            .collect();
243        self.execute(operations, None).await
244    }
245
246    pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
247        let operations = domains
248            .into_iter()
249            .map(|domain| BulkOperation::Rdap { domain })
250            .collect();
251        self.execute(operations, None).await
252    }
253
254    pub async fn execute_dns(
255        &self,
256        domains: Vec<String>,
257        record_type: RecordType,
258    ) -> Vec<BulkResult> {
259        let operations = domains
260            .into_iter()
261            .map(|domain| BulkOperation::Dns {
262                domain,
263                record_type,
264            })
265            .collect();
266        self.execute(operations, None).await
267    }
268
269    pub async fn execute_propagation(
270        &self,
271        domains: Vec<String>,
272        record_type: RecordType,
273    ) -> Vec<BulkResult> {
274        let operations = domains
275            .into_iter()
276            .map(|domain| BulkOperation::Propagation {
277                domain,
278                record_type,
279            })
280            .collect();
281        self.execute(operations, None).await
282    }
283
284    pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
285        let operations = domains
286            .into_iter()
287            .map(|domain| BulkOperation::Lookup { domain })
288            .collect();
289        self.execute(operations, None).await
290    }
291
292    pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
293        let operations = domains
294            .into_iter()
295            .map(|domain| BulkOperation::Status { domain })
296            .collect();
297        self.execute(operations, None).await
298    }
299
300    pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
301        let operations = domains
302            .into_iter()
303            .map(|domain| BulkOperation::Avail { domain })
304            .collect();
305        self.execute(operations, None).await
306    }
307
308    pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
309        let operations = domains
310            .into_iter()
311            .map(|domain| BulkOperation::Info { domain })
312            .collect();
313        self.execute(operations, None).await
314    }
315}
316
317struct Clients<'a> {
318    whois: &'a WhoisClient,
319    rdap: &'a RdapClient,
320    dns: &'a DnsResolver,
321    propagation: &'a PropagationChecker,
322    lookup: &'a SmartLookup,
323    status: &'a StatusClient,
324    avail: &'a AvailabilityChecker,
325}
326
327async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
328    match op {
329        BulkOperation::Whois { domain } => {
330            let result = clients.whois.lookup(domain).await?;
331            Ok(BulkResultData::Whois(result))
332        }
333        BulkOperation::Rdap { domain } => {
334            let result = clients.rdap.lookup_domain(domain).await?;
335            Ok(BulkResultData::Rdap(Box::new(result)))
336        }
337        BulkOperation::Dns {
338            domain,
339            record_type,
340        } => {
341            let result = clients.dns.resolve(domain, *record_type, None).await?;
342            Ok(BulkResultData::Dns(result))
343        }
344        BulkOperation::Propagation {
345            domain,
346            record_type,
347        } => {
348            let result = clients.propagation.check(domain, *record_type).await?;
349            Ok(BulkResultData::Propagation(result))
350        }
351        BulkOperation::Lookup { domain } => {
352            let result = clients.lookup.lookup(domain).await?;
353            Ok(BulkResultData::Lookup(result))
354        }
355        BulkOperation::Status { domain } => {
356            let result = clients.status.check(domain).await?;
357            Ok(BulkResultData::Status(result))
358        }
359        BulkOperation::Avail { domain } => {
360            let result = clients.avail.check(domain).await?;
361            Ok(BulkResultData::Avail(result))
362        }
363        BulkOperation::Info { domain } => {
364            let result = clients.lookup.lookup(domain).await?;
365            Ok(BulkResultData::Info(
366                crate::domain_info::DomainInfo::from_lookup_result(&result),
367            ))
368        }
369    }
370}
371
372pub fn parse_domains_from_file(content: &str) -> Vec<String> {
373    let mut domains: Vec<String> = content
374        .lines()
375        .map(|line| line.trim())
376        .filter(|line| !line.is_empty() && !line.starts_with('#'))
377        .map(|line| {
378            // Handle CSV format (take first column)
379            line.split(',').next().unwrap_or(line).trim().to_string()
380        })
381        .filter(|domain| domain.contains('.'))
382        .collect();
383
384    // Skip probable CSV header: first entry has no digits (e.g., "domain.name", "host.name")
385    if let Some(first) = domains.first() {
386        if !first.chars().any(|c| c.is_ascii_digit()) {
387            domains.remove(0);
388        }
389    }
390
391    domains
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn test_parse_domains_from_file() {
400        let content = r#"
401# This is a comment
402example1.com
403google2.com
404  whitespace3.com
405invalid
406csv,format,example.org
407"#;
408
409        let domains = parse_domains_from_file(content);
410        assert_eq!(domains.len(), 3);
411        assert!(domains.contains(&"example1.com".to_string()));
412        assert!(domains.contains(&"google2.com".to_string()));
413        assert!(domains.contains(&"whitespace3.com".to_string()));
414        // "invalid" and "csv" are filtered out because they don't contain a dot
415    }
416
417    #[test]
418    fn test_parse_domains_skip_header() {
419        let content = "host.name,owner,notes\nexample.com,Alice,Main\n";
420        let domains = parse_domains_from_file(content);
421        assert_eq!(domains, vec!["example.com"]);
422    }
423}