1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::time::sleep;
8use tracing::{debug, info, instrument};
9
10use crate::availability::{AvailabilityChecker, AvailabilityResult};
11use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
12use crate::error::Result;
13use crate::lookup::{LookupResult, SmartLookup};
14use crate::rdap::{RdapClient, RdapResponse};
15use crate::status::{StatusClient, StatusResponse};
16use crate::whois::{WhoisClient, WhoisResponse};
17
18pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23pub enum BulkOperation {
24 Whois {
25 domain: String,
26 },
27 Rdap {
28 domain: String,
29 },
30 Dns {
31 domain: String,
32 record_type: RecordType,
33 },
34 Propagation {
35 domain: String,
36 record_type: RecordType,
37 },
38 Lookup {
39 domain: String,
40 },
41 Status {
42 domain: String,
43 },
44 Avail {
45 domain: String,
46 },
47 Info {
48 domain: String,
49 },
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
55pub enum BulkResultData {
56 Whois(WhoisResponse),
57 Rdap(Box<RdapResponse>),
58 Dns(Vec<DnsRecord>),
59 Propagation(PropagationResult),
60 Lookup(LookupResult),
61 Status(StatusResponse),
62 Avail(AvailabilityResult),
63 Info(crate::domain_info::DomainInfo),
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct BulkResult {
69 pub operation: BulkOperation,
70 pub success: bool,
71 pub data: Option<BulkResultData>,
72 pub error: Option<String>,
73 pub duration_ms: u64,
74}
75
76#[derive(Debug, Clone)]
78pub struct BulkExecutor {
79 concurrency: usize,
80 rate_limit_delay: Duration,
81 whois_client: WhoisClient,
82 rdap_client: RdapClient,
83 dns_resolver: DnsResolver,
84 propagation_checker: PropagationChecker,
85 smart_lookup: SmartLookup,
86 status_client: StatusClient,
87 availability_checker: AvailabilityChecker,
88}
89
90impl Default for BulkExecutor {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96impl BulkExecutor {
97 pub fn new() -> Self {
98 Self {
99 concurrency: 10,
100 rate_limit_delay: Duration::from_millis(100),
101 whois_client: WhoisClient::new(),
102 rdap_client: RdapClient::new(),
103 dns_resolver: DnsResolver::new(),
104 propagation_checker: PropagationChecker::new(),
105 smart_lookup: SmartLookup::new(),
106 status_client: StatusClient::new(),
107 availability_checker: AvailabilityChecker::new(),
108 }
109 }
110
111 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
112 self.concurrency = concurrency.clamp(1, 50);
113 self
114 }
115
116 pub fn with_rate_limit(mut self, delay: Duration) -> Self {
117 self.rate_limit_delay = delay;
118 self
119 }
120
121 #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
122 pub async fn execute(
123 &self,
124 operations: Vec<BulkOperation>,
125 progress: Option<ProgressCallback>,
126 ) -> Vec<BulkResult> {
127 let total = operations.len();
128 let completed = Arc::new(AtomicUsize::new(0));
129
130 info!("bulk operation started");
131 debug!(
132 total = total,
133 concurrency = self.concurrency,
134 "Starting bulk execution"
135 );
136
137 let results: Vec<BulkResult> = stream::iter(operations)
138 .map(|op| {
139 let completed = completed.clone();
140 let progress = progress.as_ref();
141 let rate_limit_delay = self.rate_limit_delay;
142 let whois_client = &self.whois_client;
143 let rdap_client = &self.rdap_client;
144 let dns_resolver = &self.dns_resolver;
145 let propagation_checker = &self.propagation_checker;
146 let smart_lookup = &self.smart_lookup;
147 let status_client = &self.status_client;
148 let availability_checker = &self.availability_checker;
149
150 async move {
151 if !rate_limit_delay.is_zero() {
153 sleep(rate_limit_delay).await;
154 }
155
156 let start = std::time::Instant::now();
157 let result = execute_operation(
158 &op,
159 &Clients {
160 whois: whois_client,
161 rdap: rdap_client,
162 dns: dns_resolver,
163 propagation: propagation_checker,
164 lookup: smart_lookup,
165 status: status_client,
166 avail: availability_checker,
167 },
168 )
169 .await;
170 let duration_ms = start.elapsed().as_millis() as u64;
171
172 let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
173
174 if let Some(progress) = progress {
175 let desc = match &op {
176 BulkOperation::Whois { domain }
177 | BulkOperation::Rdap { domain }
178 | BulkOperation::Dns { domain, .. }
179 | BulkOperation::Propagation { domain, .. }
180 | BulkOperation::Lookup { domain }
181 | BulkOperation::Status { domain }
182 | BulkOperation::Avail { domain }
183 | BulkOperation::Info { domain } => domain.as_str(),
184 };
185 progress(count, total, desc);
186 }
187
188 match result {
189 Ok(data) => BulkResult {
190 operation: op,
191 success: true,
192 data: Some(data),
193 error: None,
194 duration_ms,
195 },
196 Err(e) => {
197 debug!(error = %e, "Bulk operation failed");
198 BulkResult {
199 operation: op,
200 success: false,
201 data: None,
202 error: Some(e.to_string()),
203 duration_ms,
204 }
205 }
206 }
207 }
208 })
209 .buffer_unordered(self.concurrency)
210 .collect()
211 .await;
212
213 let succeeded = results.iter().filter(|r| r.success).count();
214 let failed = results.iter().filter(|r| !r.success).count();
215 info!(
216 total = total,
217 succeeded = succeeded,
218 failed = failed,
219 "bulk operation completed"
220 );
221
222 results
223 }
224
225 pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
226 let operations = domains
227 .into_iter()
228 .map(|domain| BulkOperation::Whois { domain })
229 .collect();
230 self.execute(operations, None).await
231 }
232
233 pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
234 let operations = domains
235 .into_iter()
236 .map(|domain| BulkOperation::Rdap { domain })
237 .collect();
238 self.execute(operations, None).await
239 }
240
241 pub async fn execute_dns(
242 &self,
243 domains: Vec<String>,
244 record_type: RecordType,
245 ) -> Vec<BulkResult> {
246 let operations = domains
247 .into_iter()
248 .map(|domain| BulkOperation::Dns {
249 domain,
250 record_type,
251 })
252 .collect();
253 self.execute(operations, None).await
254 }
255
256 pub async fn execute_propagation(
257 &self,
258 domains: Vec<String>,
259 record_type: RecordType,
260 ) -> Vec<BulkResult> {
261 let operations = domains
262 .into_iter()
263 .map(|domain| BulkOperation::Propagation {
264 domain,
265 record_type,
266 })
267 .collect();
268 self.execute(operations, None).await
269 }
270
271 pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
272 let operations = domains
273 .into_iter()
274 .map(|domain| BulkOperation::Lookup { domain })
275 .collect();
276 self.execute(operations, None).await
277 }
278
279 pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
280 let operations = domains
281 .into_iter()
282 .map(|domain| BulkOperation::Status { domain })
283 .collect();
284 self.execute(operations, None).await
285 }
286
287 pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
288 let operations = domains
289 .into_iter()
290 .map(|domain| BulkOperation::Avail { domain })
291 .collect();
292 self.execute(operations, None).await
293 }
294
295 pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
296 let operations = domains
297 .into_iter()
298 .map(|domain| BulkOperation::Info { domain })
299 .collect();
300 self.execute(operations, None).await
301 }
302}
303
304struct Clients<'a> {
305 whois: &'a WhoisClient,
306 rdap: &'a RdapClient,
307 dns: &'a DnsResolver,
308 propagation: &'a PropagationChecker,
309 lookup: &'a SmartLookup,
310 status: &'a StatusClient,
311 avail: &'a AvailabilityChecker,
312}
313
314async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
315 match op {
316 BulkOperation::Whois { domain } => {
317 let result = clients.whois.lookup(domain).await?;
318 Ok(BulkResultData::Whois(result))
319 }
320 BulkOperation::Rdap { domain } => {
321 let result = clients.rdap.lookup_domain(domain).await?;
322 Ok(BulkResultData::Rdap(Box::new(result)))
323 }
324 BulkOperation::Dns {
325 domain,
326 record_type,
327 } => {
328 let result = clients.dns.resolve(domain, *record_type, None).await?;
329 Ok(BulkResultData::Dns(result))
330 }
331 BulkOperation::Propagation {
332 domain,
333 record_type,
334 } => {
335 let result = clients.propagation.check(domain, *record_type).await?;
336 Ok(BulkResultData::Propagation(result))
337 }
338 BulkOperation::Lookup { domain } => {
339 let result = clients.lookup.lookup(domain).await?;
340 Ok(BulkResultData::Lookup(result))
341 }
342 BulkOperation::Status { domain } => {
343 let result = clients.status.check(domain).await?;
344 Ok(BulkResultData::Status(result))
345 }
346 BulkOperation::Avail { domain } => {
347 let result = clients.avail.check(domain).await?;
348 Ok(BulkResultData::Avail(result))
349 }
350 BulkOperation::Info { domain } => {
351 let result = clients.lookup.lookup(domain).await?;
352 Ok(BulkResultData::Info(
353 crate::domain_info::DomainInfo::from_lookup_result(&result),
354 ))
355 }
356 }
357}
358
359const HEADER_KEYWORDS: &[&str] = &["domain", "host", "hostname", "url", "name", "site", "fqdn"];
361
362fn is_csv_header_row(first: &str) -> bool {
369 let first_col = first.split(',').next().unwrap_or(first).trim();
370 if first_col.contains('.') {
372 return false;
373 }
374 let label = first_col.to_lowercase();
375 HEADER_KEYWORDS.contains(&label.as_str())
376}
377
378pub fn parse_domains_from_file(content: &str) -> Vec<String> {
379 let mut domains: Vec<String> = content
380 .lines()
381 .map(|line| line.trim())
382 .filter(|line| !line.is_empty() && !line.starts_with('#'))
383 .map(|line| {
384 line.split(',').next().unwrap_or(line).trim().to_string()
386 })
387 .filter(|domain| domain.contains('.'))
388 .collect();
389
390 if domains
398 .first()
399 .map(|d| is_csv_header_row(d))
400 .unwrap_or(false)
401 {
402 domains.remove(0);
403 }
404
405 domains
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_parse_domains_from_file() {
414 let content = r#"
415# This is a comment
416example1.com
417google2.com
418 whitespace3.com
419invalid
420csv,format,example.org
421"#;
422
423 let domains = parse_domains_from_file(content);
424 assert_eq!(domains.len(), 3);
425 assert!(domains.contains(&"example1.com".to_string()));
426 assert!(domains.contains(&"google2.com".to_string()));
427 assert!(domains.contains(&"whitespace3.com".to_string()));
428 }
430
431 #[test]
432 fn test_parse_domains_skip_bare_header() {
433 let content = "domain\nexample.com\n";
435 let domains = parse_domains_from_file(content);
436 assert_eq!(domains, vec!["example.com"]);
437 }
438
439 #[test]
440 fn test_parse_domains_multi_column_csv_header_dropped() {
441 let content = "domain,status\ngoogle.com,live\n";
444 let domains = parse_domains_from_file(content);
445 assert_eq!(domains, vec!["google.com"]);
446 assert!(!domains.contains(&"domain".to_string()));
447 }
448
449 #[test]
450 fn first_domain_with_no_digits_is_kept() {
451 let input = "google.com\namazon.com\n";
454 let result = parse_domains_from_file(input);
455 assert_eq!(result, vec!["google.com", "amazon.com"]);
456 }
457
458 #[test]
459 fn header_row_named_hostname_is_dropped() {
460 let input = "hostname\nexample.com\n";
461 let result = parse_domains_from_file(input);
462 assert_eq!(result, vec!["example.com"]);
463 }
464
465 #[test]
466 fn domain_dot_com_is_not_dropped_as_header() {
467 let input = "domain.com\nexample.com\n";
469 let result = parse_domains_from_file(input);
470 assert_eq!(result, vec!["domain.com", "example.com"]);
471 }
472
473 #[test]
474 fn is_csv_header_row_detects_bare_keywords() {
475 assert!(is_csv_header_row("domain"));
476 assert!(is_csv_header_row("Hostname"));
477 assert!(is_csv_header_row("URL"));
478 assert!(is_csv_header_row("domain,status,notes"));
479 assert!(is_csv_header_row(" host "));
480 }
481
482 #[test]
483 fn is_csv_header_row_rejects_dotted_values() {
484 assert!(!is_csv_header_row("domain.com"));
485 assert!(!is_csv_header_row("google.com"));
486 assert!(!is_csv_header_row("host.name"));
487 }
488
489 #[test]
490 fn is_csv_header_row_rejects_non_keyword() {
491 assert!(!is_csv_header_row("example"));
492 assert!(!is_csv_header_row("mydata"));
493 }
494}