1use crate::coalesce::{CoalesceKey, SolveCoalescer};
2use crate::cookie::Cookie;
3use crate::debug_replay::{DebugConfig, DebugSink};
4use crate::docker::{DockerLimits, DockerManager};
5use crate::error::AntibotError;
6use crate::metrics::{Metrics, MetricsSnapshot};
7use crate::proxy::ProxyConfig;
8use crate::request::SolveRequest;
9use crate::retry::RetryPolicy;
10use crate::session_cache::{extract_domain, SessionCache, SessionCacheConfig};
11use crate::types::{ApiResponse, Solution, SolutionSource};
12use crate::wire::WireRequest;
13use crate::Provider;
14use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15use std::sync::Mutex as StdMutex;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::task::JoinHandle;
19use tracing::{debug, error, info, warn};
20
21#[derive(Clone)]
23pub struct Antibot {
24 inner: Arc<AntibotInner>,
25}
26
27struct AntibotInner {
28 http: reqwest::Client,
29 instances: Vec<String>,
30 instance_cursor: AtomicUsize,
31 max_timeout_ms: u64,
32 default_proxy: Option<ProxyConfig>,
33 session_cache: Option<SessionCache>,
34 coalescer: Option<SolveCoalescer>,
35 retry_policy: RetryPolicy,
36 metrics: Metrics,
37 debug_sink: Option<DebugSink>,
38
39 docker_manager: Option<Arc<DockerManager>>,
41 manage_lifecycle: bool,
42 health_check_attempts_on_recovery: u32,
43 watchdog: StdMutex<Option<JoinHandle<()>>>,
44 shutdown: Arc<AtomicBool>,
45}
46
47impl Antibot {
48 pub fn builder() -> AntibotBuilder {
50 AntibotBuilder::default()
51 }
52
53 pub fn connect(base_url: &str) -> Self {
56 Self::connect_many(vec![base_url.to_string()])
57 }
58
59 pub fn connect_many(base_urls: Vec<String>) -> Self {
61 let instances: Vec<String> = base_urls
62 .into_iter()
63 .map(|u| u.trim_end_matches('/').to_string())
64 .collect();
65 assert!(!instances.is_empty(), "connect_many: empty instance list");
66
67 Self {
68 inner: Arc::new(AntibotInner {
69 http: build_http_client(),
70 instances,
71 instance_cursor: AtomicUsize::new(0),
72 max_timeout_ms: 60000,
73 default_proxy: None,
74 session_cache: None,
75 coalescer: None,
76 retry_policy: RetryPolicy::no_retries(),
77 metrics: Metrics::new(),
78 debug_sink: None,
79 docker_manager: None,
80 manage_lifecycle: false,
81 health_check_attempts_on_recovery: 15,
82 watchdog: StdMutex::new(None),
83 shutdown: Arc::new(AtomicBool::new(false)),
84 }),
85 }
86 }
87
88 pub async fn is_available(&self) -> bool {
90 for url in &self.inner.instances {
91 if matches!(self.inner.http.get(url).send().await, Ok(r) if r.status().is_success()) {
92 return true;
93 }
94 }
95 false
96 }
97
98 pub async fn solve(&self, url: &str) -> Result<Solution, AntibotError> {
100 self.execute(SolveRequest::get(url)).await
101 }
102
103 pub async fn solve_fresh(&self, url: &str) -> Result<Solution, AntibotError> {
105 self.execute(SolveRequest::get(url).bypass_cache()).await
106 }
107
108 pub fn invalidate_session(&self, domain: &str) {
110 if let Some(cache) = &self.inner.session_cache {
111 cache.invalidate(domain);
112 }
113 }
114
115 pub fn clear_session_cache(&self) {
117 if let Some(cache) = &self.inner.session_cache {
118 cache.clear();
119 }
120 }
121
122 pub fn session_cache_size(&self) -> usize {
124 self.inner
125 .session_cache
126 .as_ref()
127 .map(|c| c.len())
128 .unwrap_or(0)
129 }
130
131 pub fn metrics(&self) -> MetricsSnapshot {
133 self.inner.metrics.snapshot()
134 }
135
136 pub async fn execute(&self, mut request: SolveRequest) -> Result<Solution, AntibotError> {
138 if request.proxy.is_none() {
139 request.proxy = self.inner.default_proxy.clone();
140 }
141
142 let cacheable = request.session_id.is_none()
143 && !request.bypass_cache
144 && matches!(request.method, crate::request::SolveMethod::Get)
145 && request.cookies.is_none();
146
147 if cacheable {
148 if let Some(cache) = &self.inner.session_cache {
149 if let Some(domain) = extract_domain(&request.url) {
150 if let Some(hit) = cache.get(&domain) {
151 debug!("session cache hit for {}", domain);
152 self.inner.metrics.record_cache_hit();
153 let age = hit.age();
154 return Ok(Solution {
155 url: request.url.clone(),
156 status: 200,
157 cookies: hit.cookies,
158 user_agent: hit.user_agent,
159 response: None,
160 solved_at: hit.solved_at_system,
161 source: SolutionSource::Cached { age },
162 });
163 }
164 }
165 }
166 }
167
168 let coalesce_key = self
169 .inner
170 .coalescer
171 .as_ref()
172 .and_then(|c| c.key_for(&request.url));
173
174 let solver = || async { self.execute_uncoalesced(&request, cacheable).await };
175
176 match (&self.inner.coalescer, coalesce_key) {
177 (Some(coalescer), Some(key)) => {
178 self.inner.metrics.record_coalesced_wait();
179 coalescer.solve_or_wait(key, solver).await
180 }
181 _ => solver().await,
182 }
183 }
184
185 async fn execute_uncoalesced(
187 &self,
188 request: &SolveRequest,
189 cacheable: bool,
190 ) -> Result<Solution, AntibotError> {
191 let policy = &self.inner.retry_policy;
192 let mut last_err: Option<AntibotError> = None;
193
194 for attempt in 1..=policy.max_attempts {
195 let backoff = policy.backoff_for_attempt(attempt);
196 if !backoff.is_zero() {
197 tokio::time::sleep(backoff).await;
198 self.inner.metrics.record_retry();
199 }
200
201 self.inner.metrics.record_attempt();
202 let started = Instant::now();
203 match self.dispatch(request).await {
204 Ok(solution) => {
205 let elapsed = started.elapsed().as_millis() as u64;
206 self.inner.metrics.record_success(elapsed);
207
208 if cacheable {
209 if let Some(cache) = &self.inner.session_cache {
210 if let Some(domain) = extract_domain(&request.url) {
211 cache.insert(
212 domain,
213 solution.cookies.clone(),
214 solution.user_agent.clone(),
215 );
216 }
217 }
218 }
219
220 if let Some(sink) = &self.inner.debug_sink {
221 sink.write(&request.url, &solution).await;
222 }
223
224 return Ok(solution);
225 }
226 Err(e) => {
227 self.inner.metrics.record_failure();
228 let retryable = policy.is_retryable(&e) && attempt < policy.max_attempts;
229 if retryable {
230 warn!(
231 "solve attempt {}/{} failed: {} (retrying)",
232 attempt, policy.max_attempts, e
233 );
234 last_err = Some(e);
235 continue;
236 }
237 return Err(e);
238 }
239 }
240 }
241
242 Err(last_err.unwrap_or_else(|| {
243 AntibotError::UnexpectedResponse("retry loop exited without result".into())
244 }))
245 }
246
247 fn next_instance_url(&self) -> &str {
249 let n = self.inner.instances.len();
250 let idx = if n == 1 {
251 0
252 } else {
253 self.inner.instance_cursor.fetch_add(1, Ordering::Relaxed) % n
254 };
255 &self.inner.instances[idx]
256 }
257
258 async fn dispatch(&self, request: &SolveRequest) -> Result<Solution, AntibotError> {
259 let wire = WireRequest::from_solve(request, self.inner.max_timeout_ms);
260 let base = self.next_instance_url();
261
262 info!(
263 "[{}] solving {} ({} cookies pre-seeded, proxy={})",
264 base,
265 request.url,
266 request.cookies.as_ref().map(|c| c.len()).unwrap_or(0),
267 request.proxy.is_some()
268 );
269
270 let resp = self
271 .inner
272 .http
273 .post(format!("{}/v1", base))
274 .json(&wire)
275 .send()
276 .await?;
277
278 if !resp.status().is_success() {
279 let status = resp.status();
280 let body = resp.text().await.unwrap_or_default();
281 return Err(AntibotError::UnexpectedResponse(format!(
282 "HTTP {}: {}",
283 status,
284 &body[..body.len().min(500)]
285 )));
286 }
287
288 let api_resp: ApiResponse = resp.json().await?;
289
290 if api_resp.status != "ok" {
291 error!("challenge failed: {}", api_resp.message);
292 return Err(AntibotError::ChallengeFailed {
293 url: request.url.clone(),
294 reason: api_resp.message,
295 });
296 }
297
298 let wire_solution = api_resp.solution.ok_or_else(|| {
299 AntibotError::UnexpectedResponse("status ok but no solution returned".into())
300 })?;
301
302 let solution = Solution::from_wire(wire_solution);
303 debug!(
304 "solved with {} cookies, status={}",
305 solution.cookies.len(),
306 solution.status
307 );
308 info!("solved {} — status {}", request.url, solution.status);
309
310 Ok(solution)
311 }
312
313 pub async fn create_session(&self) -> Result<SessionHandle, AntibotError> {
315 self.create_session_with(None, None).await
316 }
317
318 pub async fn create_session_with(
319 &self,
320 session_id: Option<String>,
321 proxy: Option<ProxyConfig>,
322 ) -> Result<SessionHandle, AntibotError> {
323 let wire = WireRequest::sessions_create(session_id.clone(), proxy);
324 let base = self.next_instance_url().to_string();
325
326 let resp = self
327 .inner
328 .http
329 .post(format!("{}/v1", base))
330 .json(&wire)
331 .send()
332 .await?;
333
334 if !resp.status().is_success() {
335 let status = resp.status();
336 let body = resp.text().await.unwrap_or_default();
337 return Err(AntibotError::UnexpectedResponse(format!(
338 "HTTP {}: {}",
339 status,
340 &body[..body.len().min(500)]
341 )));
342 }
343
344 let api_resp: ApiResponse = resp.json().await?;
345 if api_resp.status != "ok" {
346 return Err(AntibotError::ChallengeFailed {
347 url: "<sessions.create>".to_string(),
348 reason: api_resp.message,
349 });
350 }
351
352 let id = api_resp
353 .session
354 .or(session_id)
355 .ok_or_else(|| AntibotError::UnexpectedResponse("no session id returned".into()))?;
356
357 info!("created session {}", id);
358
359 Ok(SessionHandle {
360 id,
361 antibot: self.clone(),
362 destroyed: false,
363 })
364 }
365
366 pub async fn destroy_session(&self, id: &str) -> Result<(), AntibotError> {
368 let wire = WireRequest::sessions_destroy(id.to_string());
369 let base = self.next_instance_url().to_string();
370
371 let resp = self
372 .inner
373 .http
374 .post(format!("{}/v1", base))
375 .json(&wire)
376 .send()
377 .await?;
378
379 if !resp.status().is_success() {
380 let status = resp.status();
381 let body = resp.text().await.unwrap_or_default();
382 return Err(AntibotError::UnexpectedResponse(format!(
383 "HTTP {}: {}",
384 status,
385 &body[..body.len().min(500)]
386 )));
387 }
388
389 let api_resp: ApiResponse = resp.json().await?;
390 if api_resp.status != "ok" {
391 return Err(AntibotError::SessionNotFound(api_resp.message));
392 }
393 info!("destroyed session {}", id);
394 Ok(())
395 }
396
397 pub fn build_http_client(user_agent: &str) -> Result<reqwest::Client, AntibotError> {
399 use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, ACCEPT_LANGUAGE};
400
401 let mut headers = HeaderMap::new();
402 headers.insert(
403 ACCEPT,
404 HeaderValue::from_static(
405 "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
406 ),
407 );
408 headers.insert(ACCEPT_LANGUAGE, HeaderValue::from_static("en-US,en;q=0.9"));
409
410 reqwest::Client::builder()
411 .user_agent(user_agent)
412 .default_headers(headers)
413 .timeout(std::time::Duration::from_secs(30))
414 .build()
415 .map_err(AntibotError::Http)
416 }
417
418 fn spawn_health_watchdog(&self, interval: Duration) {
422 let inner = self.inner.clone();
423
424 let handle = tokio::spawn(async move {
425 let watchdog_client = match reqwest::Client::builder()
426 .timeout(Duration::from_secs(5))
427 .build()
428 {
429 Ok(c) => c,
430 Err(e) => {
431 warn!("watchdog: failed to build http client: {}", e);
432 return;
433 }
434 };
435
436 loop {
437 if inner.shutdown.load(Ordering::Relaxed) {
438 return;
439 }
440
441 tokio::time::sleep(interval).await;
442
443 if inner.shutdown.load(Ordering::Relaxed) {
444 return;
445 }
446
447 let mut any_unhealthy = false;
448 for url in &inner.instances {
449 match watchdog_client.get(url).send().await {
450 Ok(r) if r.status().is_success() => {}
451 Ok(r) => {
452 warn!("watchdog: {} returned {}", url, r.status());
453 any_unhealthy = true;
454 }
455 Err(_) => {
456 warn!("watchdog: {} unreachable", url);
457 any_unhealthy = true;
458 }
459 }
460 }
461
462 if !any_unhealthy {
463 continue;
464 }
465
466 let Some(manager) = &inner.docker_manager else {
467 continue;
468 };
469
470 warn!(
471 "watchdog: restarting container '{}'",
472 manager.container_name()
473 );
474 inner.metrics.record_container_restart();
475
476 if let Err(e) = manager.start().await {
477 warn!("watchdog: failed to (re)start: {}", e);
478 continue;
479 }
480 if let Err(e) = manager
481 .wait_healthy(inner.health_check_attempts_on_recovery)
482 .await
483 {
484 warn!("watchdog: still unhealthy after restart: {}", e);
485 }
486 }
487 });
488
489 if let Ok(mut slot) = self.inner.watchdog.lock() {
490 *slot = Some(handle);
491 }
492 }
493}
494
495fn build_http_client() -> reqwest::Client {
496 reqwest::Client::builder()
497 .timeout(std::time::Duration::from_secs(120))
498 .build()
499 .expect("failed to build HTTP client")
500}
501
502impl Drop for AntibotInner {
503 fn drop(&mut self) {
504 self.shutdown.store(true, Ordering::Relaxed);
505
506 if let Ok(mut slot) = self.watchdog.lock() {
507 if let Some(handle) = slot.take() {
508 handle.abort();
509 }
510 }
511
512 if !self.manage_lifecycle {
513 return;
514 }
515 let Some(manager) = self.docker_manager.clone() else {
516 return;
517 };
518
519 match tokio::runtime::Handle::try_current() {
520 Ok(_) => {
521 tokio::spawn(async move {
522 if let Err(e) = manager.stop().await {
523 warn!("failed to stop container on drop: {}", e);
524 }
525 });
526 }
527 Err(_) => {
528 debug!("dropping outside tokio runtime; container will leak");
529 }
530 }
531 }
532}
533
534pub struct SessionHandle {
537 id: String,
538 antibot: Antibot,
539 destroyed: bool,
540}
541
542impl SessionHandle {
543 pub fn id(&self) -> &str {
544 &self.id
545 }
546
547 pub async fn execute(&self, request: SolveRequest) -> Result<Solution, AntibotError> {
548 let req = request.with_session(self.id.clone()).bypass_cache();
549 self.antibot.execute(req).await
550 }
551
552 pub async fn solve(&self, url: &str) -> Result<Solution, AntibotError> {
553 self.execute(SolveRequest::get(url)).await
554 }
555
556 pub async fn destroy(mut self) -> Result<(), AntibotError> {
557 self.destroyed = true;
558 self.antibot.destroy_session(&self.id).await
559 }
560}
561
562impl Drop for SessionHandle {
563 fn drop(&mut self) {
564 if self.destroyed {
565 return;
566 }
567 let id = self.id.clone();
568 let antibot = self.antibot.clone();
569 match tokio::runtime::Handle::try_current() {
570 Ok(_) => {
571 tokio::spawn(async move {
572 if let Err(e) = antibot.destroy_session(&id).await {
573 warn!("failed to destroy session {} on drop: {}", id, e);
574 }
575 });
576 }
577 Err(_) => {
578 debug!(
579 "session {} dropped outside a tokio runtime; provider will GC it",
580 id
581 );
582 }
583 }
584 }
585}
586
587pub fn merge_cookies(base: &mut Vec<Cookie>, extra: Vec<Cookie>) {
589 for c in extra {
590 if let Some(existing) = base
591 .iter_mut()
592 .find(|b| b.name == c.name && b.domain == c.domain)
593 {
594 *existing = c;
595 } else {
596 base.push(c);
597 }
598 }
599}
600
601pub struct AntibotBuilder {
603 provider: Provider,
604 port: u16,
605 auto_start: bool,
606 container_name: Option<String>,
607 max_timeout_ms: u64,
608 health_check_attempts: u32,
609 default_proxy: Option<ProxyConfig>,
610 session_cache_config: Option<SessionCacheConfig>,
611 coalesce_key: Option<CoalesceKey>,
612 retry_policy: RetryPolicy,
613 debug_config: Option<DebugConfig>,
614 docker_limits: DockerLimits,
615 extra_instances: Vec<String>,
616 manage_lifecycle: bool,
617 health_watch_interval: Option<Duration>,
618}
619
620impl Default for AntibotBuilder {
621 fn default() -> Self {
622 Self {
623 provider: Provider::Byparr,
624 port: 8191,
625 auto_start: false,
626 container_name: None,
627 max_timeout_ms: 60000,
628 health_check_attempts: 15,
629 default_proxy: None,
630 session_cache_config: None,
631 coalesce_key: None,
632 retry_policy: RetryPolicy::no_retries(),
633 debug_config: None,
634 docker_limits: DockerLimits::default(),
635 extra_instances: Vec::new(),
636 manage_lifecycle: false,
637 health_watch_interval: None,
638 }
639 }
640}
641
642impl AntibotBuilder {
643 pub fn provider(mut self, provider: Provider) -> Self {
644 self.provider = provider;
645 self
646 }
647
648 pub fn port(mut self, port: u16) -> Self {
649 self.port = port;
650 self
651 }
652
653 pub fn auto_start(mut self, enabled: bool) -> Self {
654 self.auto_start = enabled;
655 self
656 }
657
658 pub fn container_name(mut self, name: impl Into<String>) -> Self {
659 self.container_name = Some(name.into());
660 self
661 }
662
663 pub fn max_timeout_ms(mut self, ms: u64) -> Self {
664 self.max_timeout_ms = ms;
665 self
666 }
667
668 pub fn health_check_attempts(mut self, attempts: u32) -> Self {
669 self.health_check_attempts = attempts;
670 self
671 }
672
673 pub fn default_proxy(mut self, proxy: ProxyConfig) -> Self {
674 self.default_proxy = Some(proxy);
675 self
676 }
677
678 pub fn enable_session_cache(mut self) -> Self {
680 self.session_cache_config = Some(SessionCacheConfig::default());
681 self
682 }
683
684 pub fn session_cache(mut self, config: SessionCacheConfig) -> Self {
685 self.session_cache_config = Some(config);
686 self
687 }
688
689 pub fn coalesce_solves(mut self, key: CoalesceKey) -> Self {
691 self.coalesce_key = Some(key);
692 self
693 }
694
695 pub fn retry(mut self, policy: RetryPolicy) -> Self {
697 self.retry_policy = policy;
698 self
699 }
700
701 pub fn debug(mut self, config: DebugConfig) -> Self {
703 self.debug_config = Some(config);
704 self
705 }
706
707 pub fn docker_limits(mut self, limits: DockerLimits) -> Self {
709 self.docker_limits = limits;
710 self
711 }
712
713 pub fn add_instance(mut self, base_url: impl Into<String>) -> Self {
716 self.extra_instances
717 .push(base_url.into().trim_end_matches('/').to_string());
718 self
719 }
720
721 pub fn manage_lifecycle(mut self, enabled: bool) -> Self {
723 self.manage_lifecycle = enabled;
724 self
725 }
726
727 pub fn health_watch(mut self, interval: Duration) -> Self {
730 self.health_watch_interval = Some(interval);
731 self
732 }
733
734 pub async fn build(self) -> Result<Antibot, AntibotError> {
735 let primary_url = format!("http://localhost:{}", self.port);
736 let mut docker_manager: Option<Arc<DockerManager>> = None;
737
738 if self.auto_start {
739 let mut manager = DockerManager::new(self.provider, self.port);
740 if let Some(name) = self.container_name {
741 manager = manager.with_container_name(name);
742 }
743 manager = manager.with_limits(self.docker_limits);
744
745 if !manager.is_docker_available().await {
746 return Err(AntibotError::DockerNotAvailable);
747 }
748
749 manager.start().await?;
750 manager.wait_healthy(self.health_check_attempts).await?;
751 docker_manager = Some(Arc::new(manager));
752 }
753
754 let mut instances = vec![primary_url];
755 instances.extend(self.extra_instances);
756
757 let inner = AntibotInner {
758 http: build_http_client(),
759 instances,
760 instance_cursor: AtomicUsize::new(0),
761 max_timeout_ms: self.max_timeout_ms,
762 default_proxy: self.default_proxy,
763 session_cache: self.session_cache_config.map(SessionCache::new),
764 coalescer: self.coalesce_key.map(SolveCoalescer::new),
765 retry_policy: self.retry_policy,
766 metrics: Metrics::new(),
767 debug_sink: self.debug_config.map(DebugSink::new),
768 docker_manager,
769 manage_lifecycle: self.manage_lifecycle,
770 health_check_attempts_on_recovery: self.health_check_attempts,
771 watchdog: StdMutex::new(None),
772 shutdown: Arc::new(AtomicBool::new(false)),
773 };
774
775 let client = Antibot {
776 inner: Arc::new(inner),
777 };
778
779 if let Some(interval) = self.health_watch_interval {
780 if client.inner.docker_manager.is_some() {
781 client.spawn_health_watchdog(interval);
782 } else {
783 debug!("health_watch ignored: no docker_manager (auto_start disabled)");
784 }
785 }
786
787 Ok(client)
788 }
789}