Skip to main content

antibot_rs/
client.rs

1use crate::coalesce::{CoalesceKey, SolveCoalescer};
2use crate::cookie::Cookie;
3use crate::debug_replay::{DebugConfig, DebugSink};
4use crate::docker::{DockerLimits, DockerManager};
5use crate::error::AntibotError;
6use crate::metrics::{Metrics, MetricsSnapshot};
7use crate::proxy::ProxyConfig;
8use crate::request::SolveRequest;
9use crate::retry::RetryPolicy;
10use crate::session_cache::{extract_domain, SessionCache, SessionCacheConfig};
11use crate::types::{ApiResponse, Solution, SolutionSource};
12use crate::wire::WireRequest;
13use crate::Provider;
14use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15use std::sync::Mutex as StdMutex;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::task::JoinHandle;
19use tracing::{debug, error, info, warn};
20
21/// Client for solving bot-detection challenges via Byparr/FlareSolverr.
22#[derive(Clone)]
23pub struct Antibot {
24    inner: Arc<AntibotInner>,
25}
26
27struct AntibotInner {
28    http: reqwest::Client,
29    instances: Vec<String>,
30    instance_cursor: AtomicUsize,
31    max_timeout_ms: u64,
32    default_proxy: Option<ProxyConfig>,
33    session_cache: Option<SessionCache>,
34    coalescer: Option<SolveCoalescer>,
35    retry_policy: RetryPolicy,
36    metrics: Metrics,
37    debug_sink: Option<DebugSink>,
38
39    // Lifecycle / health
40    docker_manager: Option<Arc<DockerManager>>,
41    manage_lifecycle: bool,
42    health_check_attempts_on_recovery: u32,
43    watchdog: StdMutex<Option<JoinHandle<()>>>,
44    shutdown: Arc<AtomicBool>,
45}
46
47impl Antibot {
48    /// Create a builder for configuring the client.
49    pub fn builder() -> AntibotBuilder {
50        AntibotBuilder::default()
51    }
52
53    /// Quick constructor that connects to an already-running instance.
54    /// Does NOT auto-start Docker.
55    pub fn connect(base_url: &str) -> Self {
56        Self::connect_many(vec![base_url.to_string()])
57    }
58
59    /// Connect to a pool of instances. Requests round-robin across them.
60    pub fn connect_many(base_urls: Vec<String>) -> Self {
61        let instances: Vec<String> = base_urls
62            .into_iter()
63            .map(|u| u.trim_end_matches('/').to_string())
64            .collect();
65        assert!(!instances.is_empty(), "connect_many: empty instance list");
66
67        Self {
68            inner: Arc::new(AntibotInner {
69                http: build_http_client(),
70                instances,
71                instance_cursor: AtomicUsize::new(0),
72                max_timeout_ms: 60000,
73                default_proxy: None,
74                session_cache: None,
75                coalescer: None,
76                retry_policy: RetryPolicy::no_retries(),
77                metrics: Metrics::new(),
78                debug_sink: None,
79                docker_manager: None,
80                manage_lifecycle: false,
81                health_check_attempts_on_recovery: 15,
82                watchdog: StdMutex::new(None),
83                shutdown: Arc::new(AtomicBool::new(false)),
84            }),
85        }
86    }
87
88    /// Check if the underlying service is reachable on at least one instance.
89    pub async fn is_available(&self) -> bool {
90        for url in &self.inner.instances {
91            if matches!(self.inner.http.get(url).send().await, Ok(r) if r.status().is_success()) {
92                return true;
93            }
94        }
95        false
96    }
97
98    /// Convenience: solve a URL with a simple GET.
99    pub async fn solve(&self, url: &str) -> Result<Solution, AntibotError> {
100        self.execute(SolveRequest::get(url)).await
101    }
102
103    /// Force a fresh solve, bypassing the session cache for this URL.
104    pub async fn solve_fresh(&self, url: &str) -> Result<Solution, AntibotError> {
105        self.execute(SolveRequest::get(url).bypass_cache()).await
106    }
107
108    /// Manually drop the cached session for `domain`.
109    pub fn invalidate_session(&self, domain: &str) {
110        if let Some(cache) = &self.inner.session_cache {
111            cache.invalidate(domain);
112        }
113    }
114
115    /// Drop every cached session.
116    pub fn clear_session_cache(&self) {
117        if let Some(cache) = &self.inner.session_cache {
118            cache.clear();
119        }
120    }
121
122    /// Number of cached sessions, or 0 if caching is disabled.
123    pub fn session_cache_size(&self) -> usize {
124        self.inner
125            .session_cache
126            .as_ref()
127            .map(|c| c.len())
128            .unwrap_or(0)
129    }
130
131    /// Snapshot of all atomic counters.
132    pub fn metrics(&self) -> MetricsSnapshot {
133        self.inner.metrics.snapshot()
134    }
135
136    /// Unified entry point: cache check → coalesce → retry-wrapped dispatch → cache write.
137    pub async fn execute(&self, mut request: SolveRequest) -> Result<Solution, AntibotError> {
138        if request.proxy.is_none() {
139            request.proxy = self.inner.default_proxy.clone();
140        }
141
142        let cacheable = request.session_id.is_none()
143            && !request.bypass_cache
144            && matches!(request.method, crate::request::SolveMethod::Get)
145            && request.cookies.is_none();
146
147        if cacheable {
148            if let Some(cache) = &self.inner.session_cache {
149                if let Some(domain) = extract_domain(&request.url) {
150                    if let Some(hit) = cache.get(&domain) {
151                        debug!("session cache hit for {}", domain);
152                        self.inner.metrics.record_cache_hit();
153                        let age = hit.age();
154                        return Ok(Solution {
155                            url: request.url.clone(),
156                            status: 200,
157                            cookies: hit.cookies,
158                            user_agent: hit.user_agent,
159                            response: None,
160                            solved_at: hit.solved_at_system,
161                            source: SolutionSource::Cached { age },
162                        });
163                    }
164                }
165            }
166        }
167
168        let coalesce_key = self
169            .inner
170            .coalescer
171            .as_ref()
172            .and_then(|c| c.key_for(&request.url));
173
174        let solver = || async { self.execute_uncoalesced(&request, cacheable).await };
175
176        match (&self.inner.coalescer, coalesce_key) {
177            (Some(coalescer), Some(key)) => {
178                self.inner.metrics.record_coalesced_wait();
179                coalescer.solve_or_wait(key, solver).await
180            }
181            _ => solver().await,
182        }
183    }
184
185    /// Hit the provider with retries, update the session cache on success.
186    async fn execute_uncoalesced(
187        &self,
188        request: &SolveRequest,
189        cacheable: bool,
190    ) -> Result<Solution, AntibotError> {
191        let policy = &self.inner.retry_policy;
192        let mut last_err: Option<AntibotError> = None;
193
194        for attempt in 1..=policy.max_attempts {
195            let backoff = policy.backoff_for_attempt(attempt);
196            if !backoff.is_zero() {
197                tokio::time::sleep(backoff).await;
198                self.inner.metrics.record_retry();
199            }
200
201            self.inner.metrics.record_attempt();
202            let started = Instant::now();
203            match self.dispatch(request).await {
204                Ok(solution) => {
205                    let elapsed = started.elapsed().as_millis() as u64;
206                    self.inner.metrics.record_success(elapsed);
207
208                    if cacheable {
209                        if let Some(cache) = &self.inner.session_cache {
210                            if let Some(domain) = extract_domain(&request.url) {
211                                cache.insert(
212                                    domain,
213                                    solution.cookies.clone(),
214                                    solution.user_agent.clone(),
215                                );
216                            }
217                        }
218                    }
219
220                    if let Some(sink) = &self.inner.debug_sink {
221                        sink.write(&request.url, &solution).await;
222                    }
223
224                    return Ok(solution);
225                }
226                Err(e) => {
227                    self.inner.metrics.record_failure();
228                    let retryable = policy.is_retryable(&e) && attempt < policy.max_attempts;
229                    if retryable {
230                        warn!(
231                            "solve attempt {}/{} failed: {} (retrying)",
232                            attempt, policy.max_attempts, e
233                        );
234                        last_err = Some(e);
235                        continue;
236                    }
237                    return Err(e);
238                }
239            }
240        }
241
242        Err(last_err.unwrap_or_else(|| {
243            AntibotError::UnexpectedResponse("retry loop exited without result".into())
244        }))
245    }
246
247    /// Pick the next instance round-robin and round-trip to its `/v1`.
248    fn next_instance_url(&self) -> &str {
249        let n = self.inner.instances.len();
250        let idx = if n == 1 {
251            0
252        } else {
253            self.inner.instance_cursor.fetch_add(1, Ordering::Relaxed) % n
254        };
255        &self.inner.instances[idx]
256    }
257
258    async fn dispatch(&self, request: &SolveRequest) -> Result<Solution, AntibotError> {
259        let wire = WireRequest::from_solve(request, self.inner.max_timeout_ms);
260        let base = self.next_instance_url();
261
262        info!(
263            "[{}] solving {} ({} cookies pre-seeded, proxy={})",
264            base,
265            request.url,
266            request.cookies.as_ref().map(|c| c.len()).unwrap_or(0),
267            request.proxy.is_some()
268        );
269
270        let resp = self
271            .inner
272            .http
273            .post(format!("{}/v1", base))
274            .json(&wire)
275            .send()
276            .await?;
277
278        if !resp.status().is_success() {
279            let status = resp.status();
280            let body = resp.text().await.unwrap_or_default();
281            return Err(AntibotError::UnexpectedResponse(format!(
282                "HTTP {}: {}",
283                status,
284                &body[..body.len().min(500)]
285            )));
286        }
287
288        let api_resp: ApiResponse = resp.json().await?;
289
290        if api_resp.status != "ok" {
291            error!("challenge failed: {}", api_resp.message);
292            return Err(AntibotError::ChallengeFailed {
293                url: request.url.clone(),
294                reason: api_resp.message,
295            });
296        }
297
298        let wire_solution = api_resp.solution.ok_or_else(|| {
299            AntibotError::UnexpectedResponse("status ok but no solution returned".into())
300        })?;
301
302        let solution = Solution::from_wire(wire_solution);
303        debug!(
304            "solved with {} cookies, status={}",
305            solution.cookies.len(),
306            solution.status
307        );
308        info!("solved {} — status {}", request.url, solution.status);
309
310        Ok(solution)
311    }
312
313    /// Create a persistent browser session on the provider.
314    pub async fn create_session(&self) -> Result<SessionHandle, AntibotError> {
315        self.create_session_with(None, None).await
316    }
317
318    pub async fn create_session_with(
319        &self,
320        session_id: Option<String>,
321        proxy: Option<ProxyConfig>,
322    ) -> Result<SessionHandle, AntibotError> {
323        let wire = WireRequest::sessions_create(session_id.clone(), proxy);
324        let base = self.next_instance_url().to_string();
325
326        let resp = self
327            .inner
328            .http
329            .post(format!("{}/v1", base))
330            .json(&wire)
331            .send()
332            .await?;
333
334        if !resp.status().is_success() {
335            let status = resp.status();
336            let body = resp.text().await.unwrap_or_default();
337            return Err(AntibotError::UnexpectedResponse(format!(
338                "HTTP {}: {}",
339                status,
340                &body[..body.len().min(500)]
341            )));
342        }
343
344        let api_resp: ApiResponse = resp.json().await?;
345        if api_resp.status != "ok" {
346            return Err(AntibotError::ChallengeFailed {
347                url: "<sessions.create>".to_string(),
348                reason: api_resp.message,
349            });
350        }
351
352        let id = api_resp
353            .session
354            .or(session_id)
355            .ok_or_else(|| AntibotError::UnexpectedResponse("no session id returned".into()))?;
356
357        info!("created session {}", id);
358
359        Ok(SessionHandle {
360            id,
361            antibot: self.clone(),
362            destroyed: false,
363        })
364    }
365
366    /// Tear down a provider session by id.
367    pub async fn destroy_session(&self, id: &str) -> Result<(), AntibotError> {
368        let wire = WireRequest::sessions_destroy(id.to_string());
369        let base = self.next_instance_url().to_string();
370
371        let resp = self
372            .inner
373            .http
374            .post(format!("{}/v1", base))
375            .json(&wire)
376            .send()
377            .await?;
378
379        if !resp.status().is_success() {
380            let status = resp.status();
381            let body = resp.text().await.unwrap_or_default();
382            return Err(AntibotError::UnexpectedResponse(format!(
383                "HTTP {}: {}",
384                status,
385                &body[..body.len().min(500)]
386            )));
387        }
388
389        let api_resp: ApiResponse = resp.json().await?;
390        if api_resp.status != "ok" {
391            return Err(AntibotError::SessionNotFound(api_resp.message));
392        }
393        info!("destroyed session {}", id);
394        Ok(())
395    }
396
397    /// Build a reqwest `Client` pre-configured with a solved user-agent.
398    pub fn build_http_client(user_agent: &str) -> Result<reqwest::Client, AntibotError> {
399        use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, ACCEPT_LANGUAGE};
400
401        let mut headers = HeaderMap::new();
402        headers.insert(
403            ACCEPT,
404            HeaderValue::from_static(
405                "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
406            ),
407        );
408        headers.insert(ACCEPT_LANGUAGE, HeaderValue::from_static("en-US,en;q=0.9"));
409
410        reqwest::Client::builder()
411            .user_agent(user_agent)
412            .default_headers(headers)
413            .timeout(std::time::Duration::from_secs(30))
414            .build()
415            .map_err(AntibotError::Http)
416    }
417
418    /// Spawn a watchdog task that pings each instance every `interval` and
419    /// triggers a container restart on failure (only when this client owns
420    /// the lifecycle).
421    fn spawn_health_watchdog(&self, interval: Duration) {
422        let inner = self.inner.clone();
423
424        let handle = tokio::spawn(async move {
425            let watchdog_client = match reqwest::Client::builder()
426                .timeout(Duration::from_secs(5))
427                .build()
428            {
429                Ok(c) => c,
430                Err(e) => {
431                    warn!("watchdog: failed to build http client: {}", e);
432                    return;
433                }
434            };
435
436            loop {
437                if inner.shutdown.load(Ordering::Relaxed) {
438                    return;
439                }
440
441                tokio::time::sleep(interval).await;
442
443                if inner.shutdown.load(Ordering::Relaxed) {
444                    return;
445                }
446
447                let mut any_unhealthy = false;
448                for url in &inner.instances {
449                    match watchdog_client.get(url).send().await {
450                        Ok(r) if r.status().is_success() => {}
451                        Ok(r) => {
452                            warn!("watchdog: {} returned {}", url, r.status());
453                            any_unhealthy = true;
454                        }
455                        Err(_) => {
456                            warn!("watchdog: {} unreachable", url);
457                            any_unhealthy = true;
458                        }
459                    }
460                }
461
462                if !any_unhealthy {
463                    continue;
464                }
465
466                let Some(manager) = &inner.docker_manager else {
467                    continue;
468                };
469
470                warn!(
471                    "watchdog: restarting container '{}'",
472                    manager.container_name()
473                );
474                inner.metrics.record_container_restart();
475
476                if let Err(e) = manager.start().await {
477                    warn!("watchdog: failed to (re)start: {}", e);
478                    continue;
479                }
480                if let Err(e) = manager
481                    .wait_healthy(inner.health_check_attempts_on_recovery)
482                    .await
483                {
484                    warn!("watchdog: still unhealthy after restart: {}", e);
485                }
486            }
487        });
488
489        if let Ok(mut slot) = self.inner.watchdog.lock() {
490            *slot = Some(handle);
491        }
492    }
493}
494
495fn build_http_client() -> reqwest::Client {
496    reqwest::Client::builder()
497        .timeout(std::time::Duration::from_secs(120))
498        .build()
499        .expect("failed to build HTTP client")
500}
501
502impl Drop for AntibotInner {
503    fn drop(&mut self) {
504        self.shutdown.store(true, Ordering::Relaxed);
505
506        if let Ok(mut slot) = self.watchdog.lock() {
507            if let Some(handle) = slot.take() {
508                handle.abort();
509            }
510        }
511
512        if !self.manage_lifecycle {
513            return;
514        }
515        let Some(manager) = self.docker_manager.clone() else {
516            return;
517        };
518
519        match tokio::runtime::Handle::try_current() {
520            Ok(_) => {
521                tokio::spawn(async move {
522                    if let Err(e) = manager.stop().await {
523                        warn!("failed to stop container on drop: {}", e);
524                    }
525                });
526            }
527            Err(_) => {
528                debug!("dropping outside tokio runtime; container will leak");
529            }
530        }
531    }
532}
533
534/// Handle to a provider-side persistent session. Drops auto-destroy the session
535/// on a background task; call [`SessionHandle::destroy`] explicitly to await.
536pub struct SessionHandle {
537    id: String,
538    antibot: Antibot,
539    destroyed: bool,
540}
541
542impl SessionHandle {
543    pub fn id(&self) -> &str {
544        &self.id
545    }
546
547    pub async fn execute(&self, request: SolveRequest) -> Result<Solution, AntibotError> {
548        let req = request.with_session(self.id.clone()).bypass_cache();
549        self.antibot.execute(req).await
550    }
551
552    pub async fn solve(&self, url: &str) -> Result<Solution, AntibotError> {
553        self.execute(SolveRequest::get(url)).await
554    }
555
556    pub async fn destroy(mut self) -> Result<(), AntibotError> {
557        self.destroyed = true;
558        self.antibot.destroy_session(&self.id).await
559    }
560}
561
562impl Drop for SessionHandle {
563    fn drop(&mut self) {
564        if self.destroyed {
565            return;
566        }
567        let id = self.id.clone();
568        let antibot = self.antibot.clone();
569        match tokio::runtime::Handle::try_current() {
570            Ok(_) => {
571                tokio::spawn(async move {
572                    if let Err(e) = antibot.destroy_session(&id).await {
573                        warn!("failed to destroy session {} on drop: {}", id, e);
574                    }
575                });
576            }
577            Err(_) => {
578                debug!(
579                    "session {} dropped outside a tokio runtime; provider will GC it",
580                    id
581                );
582            }
583        }
584    }
585}
586
587/// Apply additional cookies to an existing solution.
588pub fn merge_cookies(base: &mut Vec<Cookie>, extra: Vec<Cookie>) {
589    for c in extra {
590        if let Some(existing) = base
591            .iter_mut()
592            .find(|b| b.name == c.name && b.domain == c.domain)
593        {
594            *existing = c;
595        } else {
596            base.push(c);
597        }
598    }
599}
600
601/// Builder for configuring and initializing an [`Antibot`] client.
602pub struct AntibotBuilder {
603    provider: Provider,
604    port: u16,
605    auto_start: bool,
606    container_name: Option<String>,
607    max_timeout_ms: u64,
608    health_check_attempts: u32,
609    default_proxy: Option<ProxyConfig>,
610    session_cache_config: Option<SessionCacheConfig>,
611    coalesce_key: Option<CoalesceKey>,
612    retry_policy: RetryPolicy,
613    debug_config: Option<DebugConfig>,
614    docker_limits: DockerLimits,
615    extra_instances: Vec<String>,
616    manage_lifecycle: bool,
617    health_watch_interval: Option<Duration>,
618}
619
620impl Default for AntibotBuilder {
621    fn default() -> Self {
622        Self {
623            provider: Provider::Byparr,
624            port: 8191,
625            auto_start: false,
626            container_name: None,
627            max_timeout_ms: 60000,
628            health_check_attempts: 15,
629            default_proxy: None,
630            session_cache_config: None,
631            coalesce_key: None,
632            retry_policy: RetryPolicy::no_retries(),
633            debug_config: None,
634            docker_limits: DockerLimits::default(),
635            extra_instances: Vec::new(),
636            manage_lifecycle: false,
637            health_watch_interval: None,
638        }
639    }
640}
641
642impl AntibotBuilder {
643    pub fn provider(mut self, provider: Provider) -> Self {
644        self.provider = provider;
645        self
646    }
647
648    pub fn port(mut self, port: u16) -> Self {
649        self.port = port;
650        self
651    }
652
653    pub fn auto_start(mut self, enabled: bool) -> Self {
654        self.auto_start = enabled;
655        self
656    }
657
658    pub fn container_name(mut self, name: impl Into<String>) -> Self {
659        self.container_name = Some(name.into());
660        self
661    }
662
663    pub fn max_timeout_ms(mut self, ms: u64) -> Self {
664        self.max_timeout_ms = ms;
665        self
666    }
667
668    pub fn health_check_attempts(mut self, attempts: u32) -> Self {
669        self.health_check_attempts = attempts;
670        self
671    }
672
673    pub fn default_proxy(mut self, proxy: ProxyConfig) -> Self {
674        self.default_proxy = Some(proxy);
675        self
676    }
677
678    /// Enable session caching with default config (30 min TTL, 1000 entries).
679    pub fn enable_session_cache(mut self) -> Self {
680        self.session_cache_config = Some(SessionCacheConfig::default());
681        self
682    }
683
684    pub fn session_cache(mut self, config: SessionCacheConfig) -> Self {
685        self.session_cache_config = Some(config);
686        self
687    }
688
689    /// Coalesce concurrent solves that share the same key.
690    pub fn coalesce_solves(mut self, key: CoalesceKey) -> Self {
691        self.coalesce_key = Some(key);
692        self
693    }
694
695    /// Wrap each provider call in a retry policy.
696    pub fn retry(mut self, policy: RetryPolicy) -> Self {
697        self.retry_policy = policy;
698        self
699    }
700
701    /// Enable disk dumps of every solved page.
702    pub fn debug(mut self, config: DebugConfig) -> Self {
703        self.debug_config = Some(config);
704        self
705    }
706
707    /// Apply Docker resource caps when creating a managed container.
708    pub fn docker_limits(mut self, limits: DockerLimits) -> Self {
709        self.docker_limits = limits;
710        self
711    }
712
713    /// Add an additional pre-existing instance URL. Combined with the
714    /// `auto_start`/`port` instance, requests round-robin across all of them.
715    pub fn add_instance(mut self, base_url: impl Into<String>) -> Self {
716        self.extra_instances
717            .push(base_url.into().trim_end_matches('/').to_string());
718        self
719    }
720
721    /// Stop the spawned container when the client is dropped.
722    pub fn manage_lifecycle(mut self, enabled: bool) -> Self {
723        self.manage_lifecycle = enabled;
724        self
725    }
726
727    /// Run a background watchdog that restarts the container if a health check
728    /// fails. Only takes effect when `auto_start` is on.
729    pub fn health_watch(mut self, interval: Duration) -> Self {
730        self.health_watch_interval = Some(interval);
731        self
732    }
733
734    pub async fn build(self) -> Result<Antibot, AntibotError> {
735        let primary_url = format!("http://localhost:{}", self.port);
736        let mut docker_manager: Option<Arc<DockerManager>> = None;
737
738        if self.auto_start {
739            let mut manager = DockerManager::new(self.provider, self.port);
740            if let Some(name) = self.container_name {
741                manager = manager.with_container_name(name);
742            }
743            manager = manager.with_limits(self.docker_limits);
744
745            if !manager.is_docker_available().await {
746                return Err(AntibotError::DockerNotAvailable);
747            }
748
749            manager.start().await?;
750            manager.wait_healthy(self.health_check_attempts).await?;
751            docker_manager = Some(Arc::new(manager));
752        }
753
754        let mut instances = vec![primary_url];
755        instances.extend(self.extra_instances);
756
757        let inner = AntibotInner {
758            http: build_http_client(),
759            instances,
760            instance_cursor: AtomicUsize::new(0),
761            max_timeout_ms: self.max_timeout_ms,
762            default_proxy: self.default_proxy,
763            session_cache: self.session_cache_config.map(SessionCache::new),
764            coalescer: self.coalesce_key.map(SolveCoalescer::new),
765            retry_policy: self.retry_policy,
766            metrics: Metrics::new(),
767            debug_sink: self.debug_config.map(DebugSink::new),
768            docker_manager,
769            manage_lifecycle: self.manage_lifecycle,
770            health_check_attempts_on_recovery: self.health_check_attempts,
771            watchdog: StdMutex::new(None),
772            shutdown: Arc::new(AtomicBool::new(false)),
773        };
774
775        let client = Antibot {
776            inner: Arc::new(inner),
777        };
778
779        if let Some(interval) = self.health_watch_interval {
780            if client.inner.docker_manager.is_some() {
781                client.spawn_health_watchdog(interval);
782            } else {
783                debug!("health_watch ignored: no docker_manager (auto_start disabled)");
784            }
785        }
786
787        Ok(client)
788    }
789}