1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use futures::stream::{self, StreamExt};
6use serde::{Deserialize, Serialize};
7use tokio::sync::Semaphore;
8use tokio::time::sleep;
9use tracing::{debug, info, instrument, warn};
10
11use crate::availability::{AvailabilityChecker, AvailabilityResult};
12use crate::dns::{DnsRecord, DnsResolver, PropagationChecker, PropagationResult, RecordType};
13use crate::error::Result;
14use crate::lookup::{LookupResult, SmartLookup};
15use crate::rdap::{RdapClient, RdapResponse};
16use crate::status::{StatusClient, StatusResponse};
17use crate::whois::{WhoisClient, WhoisResponse};
18
19pub type ProgressCallback = Box<dyn Fn(usize, usize, &str) + Send + Sync>;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum BulkOperation {
25 Whois {
26 domain: String,
27 },
28 Rdap {
29 domain: String,
30 },
31 Dns {
32 domain: String,
33 record_type: RecordType,
34 },
35 Propagation {
36 domain: String,
37 record_type: RecordType,
38 },
39 Lookup {
40 domain: String,
41 },
42 Status {
43 domain: String,
44 },
45 Avail {
46 domain: String,
47 },
48 Info {
49 domain: String,
50 },
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "result_type", content = "data", rename_all = "snake_case")]
56pub enum BulkResultData {
57 Whois(WhoisResponse),
58 Rdap(Box<RdapResponse>),
59 Dns(Vec<DnsRecord>),
60 Propagation(PropagationResult),
61 Lookup(LookupResult),
62 Status(StatusResponse),
63 Avail(AvailabilityResult),
64 Info(crate::domain_info::DomainInfo),
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct BulkResult {
70 pub operation: BulkOperation,
71 pub success: bool,
72 pub data: Option<BulkResultData>,
73 pub error: Option<String>,
74 pub duration_ms: u64,
75}
76
77#[derive(Debug, Clone)]
79pub struct BulkExecutor {
80 concurrency: usize,
81 rate_limit_delay: Duration,
82 whois_client: WhoisClient,
83 rdap_client: RdapClient,
84 dns_resolver: DnsResolver,
85 propagation_checker: PropagationChecker,
86 smart_lookup: SmartLookup,
87 status_client: StatusClient,
88 availability_checker: AvailabilityChecker,
89}
90
91impl Default for BulkExecutor {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl BulkExecutor {
98 pub fn new() -> Self {
99 Self {
100 concurrency: 10,
101 rate_limit_delay: Duration::from_millis(100),
102 whois_client: WhoisClient::new(),
103 rdap_client: RdapClient::new(),
104 dns_resolver: DnsResolver::new(),
105 propagation_checker: PropagationChecker::new(),
106 smart_lookup: SmartLookup::new(),
107 status_client: StatusClient::new(),
108 availability_checker: AvailabilityChecker::new(),
109 }
110 }
111
112 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
113 self.concurrency = concurrency.clamp(1, 50);
114 self
115 }
116
117 pub fn with_rate_limit(mut self, delay: Duration) -> Self {
118 self.rate_limit_delay = delay;
119 self
120 }
121
122 #[instrument(skip(self, operations, progress), fields(count = operations.len(), concurrency = self.concurrency))]
123 pub async fn execute(
124 &self,
125 operations: Vec<BulkOperation>,
126 progress: Option<ProgressCallback>,
127 ) -> Vec<BulkResult> {
128 let total = operations.len();
129 let completed = Arc::new(AtomicUsize::new(0));
130 let semaphore = Arc::new(Semaphore::new(self.concurrency));
131
132 info!("bulk operation started");
133 debug!(
134 total = total,
135 concurrency = self.concurrency,
136 "Starting bulk execution"
137 );
138
139 let results: Vec<BulkResult> = stream::iter(operations)
140 .map(|op| {
141 let semaphore = semaphore.clone();
142 let completed = completed.clone();
143 let progress = progress.as_ref();
144 let rate_limit_delay = self.rate_limit_delay;
145 let whois_client = &self.whois_client;
146 let rdap_client = &self.rdap_client;
147 let dns_resolver = &self.dns_resolver;
148 let propagation_checker = &self.propagation_checker;
149 let smart_lookup = &self.smart_lookup;
150 let status_client = &self.status_client;
151 let availability_checker = &self.availability_checker;
152
153 async move {
154 let Ok(_permit) = semaphore.acquire().await else {
155 return BulkResult {
156 operation: op,
157 success: false,
158 data: None,
159 error: Some("Operation cancelled".to_string()),
160 duration_ms: 0,
161 };
162 };
163
164 if !rate_limit_delay.is_zero() {
166 sleep(rate_limit_delay).await;
167 }
168
169 let start = std::time::Instant::now();
170 let result = execute_operation(
171 &op,
172 &Clients {
173 whois: whois_client,
174 rdap: rdap_client,
175 dns: dns_resolver,
176 propagation: propagation_checker,
177 lookup: smart_lookup,
178 status: status_client,
179 avail: availability_checker,
180 },
181 )
182 .await;
183 let duration_ms = start.elapsed().as_millis() as u64;
184
185 let count = completed.fetch_add(1, Ordering::Relaxed) + 1;
186
187 if let Some(progress) = progress {
188 let desc = match &op {
189 BulkOperation::Whois { domain }
190 | BulkOperation::Rdap { domain }
191 | BulkOperation::Dns { domain, .. }
192 | BulkOperation::Propagation { domain, .. }
193 | BulkOperation::Lookup { domain }
194 | BulkOperation::Status { domain }
195 | BulkOperation::Avail { domain }
196 | BulkOperation::Info { domain } => domain.as_str(),
197 };
198 progress(count, total, desc);
199 }
200
201 match result {
202 Ok(data) => BulkResult {
203 operation: op,
204 success: true,
205 data: Some(data),
206 error: None,
207 duration_ms,
208 },
209 Err(e) => {
210 warn!(error = %e, "Bulk operation failed");
211 BulkResult {
212 operation: op,
213 success: false,
214 data: None,
215 error: Some(e.to_string()),
216 duration_ms,
217 }
218 }
219 }
220 }
221 })
222 .buffer_unordered(self.concurrency)
223 .collect()
224 .await;
225
226 let succeeded = results.iter().filter(|r| r.success).count();
227 let failed = results.iter().filter(|r| !r.success).count();
228 info!(
229 total = total,
230 succeeded = succeeded,
231 failed = failed,
232 "bulk operation completed"
233 );
234
235 results
236 }
237
238 pub async fn execute_whois(&self, domains: Vec<String>) -> Vec<BulkResult> {
239 let operations = domains
240 .into_iter()
241 .map(|domain| BulkOperation::Whois { domain })
242 .collect();
243 self.execute(operations, None).await
244 }
245
246 pub async fn execute_rdap(&self, domains: Vec<String>) -> Vec<BulkResult> {
247 let operations = domains
248 .into_iter()
249 .map(|domain| BulkOperation::Rdap { domain })
250 .collect();
251 self.execute(operations, None).await
252 }
253
254 pub async fn execute_dns(
255 &self,
256 domains: Vec<String>,
257 record_type: RecordType,
258 ) -> Vec<BulkResult> {
259 let operations = domains
260 .into_iter()
261 .map(|domain| BulkOperation::Dns {
262 domain,
263 record_type,
264 })
265 .collect();
266 self.execute(operations, None).await
267 }
268
269 pub async fn execute_propagation(
270 &self,
271 domains: Vec<String>,
272 record_type: RecordType,
273 ) -> Vec<BulkResult> {
274 let operations = domains
275 .into_iter()
276 .map(|domain| BulkOperation::Propagation {
277 domain,
278 record_type,
279 })
280 .collect();
281 self.execute(operations, None).await
282 }
283
284 pub async fn execute_lookup(&self, domains: Vec<String>) -> Vec<BulkResult> {
285 let operations = domains
286 .into_iter()
287 .map(|domain| BulkOperation::Lookup { domain })
288 .collect();
289 self.execute(operations, None).await
290 }
291
292 pub async fn execute_status(&self, domains: Vec<String>) -> Vec<BulkResult> {
293 let operations = domains
294 .into_iter()
295 .map(|domain| BulkOperation::Status { domain })
296 .collect();
297 self.execute(operations, None).await
298 }
299
300 pub async fn execute_avail(&self, domains: Vec<String>) -> Vec<BulkResult> {
301 let operations = domains
302 .into_iter()
303 .map(|domain| BulkOperation::Avail { domain })
304 .collect();
305 self.execute(operations, None).await
306 }
307
308 pub async fn execute_info(&self, domains: Vec<String>) -> Vec<BulkResult> {
309 let operations = domains
310 .into_iter()
311 .map(|domain| BulkOperation::Info { domain })
312 .collect();
313 self.execute(operations, None).await
314 }
315}
316
317struct Clients<'a> {
318 whois: &'a WhoisClient,
319 rdap: &'a RdapClient,
320 dns: &'a DnsResolver,
321 propagation: &'a PropagationChecker,
322 lookup: &'a SmartLookup,
323 status: &'a StatusClient,
324 avail: &'a AvailabilityChecker,
325}
326
327async fn execute_operation(op: &BulkOperation, clients: &Clients<'_>) -> Result<BulkResultData> {
328 match op {
329 BulkOperation::Whois { domain } => {
330 let result = clients.whois.lookup(domain).await?;
331 Ok(BulkResultData::Whois(result))
332 }
333 BulkOperation::Rdap { domain } => {
334 let result = clients.rdap.lookup_domain(domain).await?;
335 Ok(BulkResultData::Rdap(Box::new(result)))
336 }
337 BulkOperation::Dns {
338 domain,
339 record_type,
340 } => {
341 let result = clients.dns.resolve(domain, *record_type, None).await?;
342 Ok(BulkResultData::Dns(result))
343 }
344 BulkOperation::Propagation {
345 domain,
346 record_type,
347 } => {
348 let result = clients.propagation.check(domain, *record_type).await?;
349 Ok(BulkResultData::Propagation(result))
350 }
351 BulkOperation::Lookup { domain } => {
352 let result = clients.lookup.lookup(domain).await?;
353 Ok(BulkResultData::Lookup(result))
354 }
355 BulkOperation::Status { domain } => {
356 let result = clients.status.check(domain).await?;
357 Ok(BulkResultData::Status(result))
358 }
359 BulkOperation::Avail { domain } => {
360 let result = clients.avail.check(domain).await?;
361 Ok(BulkResultData::Avail(result))
362 }
363 BulkOperation::Info { domain } => {
364 let result = clients.lookup.lookup(domain).await?;
365 Ok(BulkResultData::Info(
366 crate::domain_info::DomainInfo::from_lookup_result(&result),
367 ))
368 }
369 }
370}
371
372pub fn parse_domains_from_file(content: &str) -> Vec<String> {
373 let mut domains: Vec<String> = content
374 .lines()
375 .map(|line| line.trim())
376 .filter(|line| !line.is_empty() && !line.starts_with('#'))
377 .map(|line| {
378 line.split(',').next().unwrap_or(line).trim().to_string()
380 })
381 .filter(|domain| domain.contains('.'))
382 .collect();
383
384 if let Some(first) = domains.first() {
386 if !first.chars().any(|c| c.is_ascii_digit()) {
387 domains.remove(0);
388 }
389 }
390
391 domains
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn test_parse_domains_from_file() {
400 let content = r#"
401# This is a comment
402example1.com
403google2.com
404 whitespace3.com
405invalid
406csv,format,example.org
407"#;
408
409 let domains = parse_domains_from_file(content);
410 assert_eq!(domains.len(), 3);
411 assert!(domains.contains(&"example1.com".to_string()));
412 assert!(domains.contains(&"google2.com".to_string()));
413 assert!(domains.contains(&"whitespace3.com".to_string()));
414 }
416
417 #[test]
418 fn test_parse_domains_skip_header() {
419 let content = "host.name,owner,notes\nexample.com,Alice,Main\n";
420 let domains = parse_domains_from_file(content);
421 assert_eq!(domains, vec!["example.com"]);
422 }
423}