void_crawl_core 0.3.3

Rust-native CDP browser automation core — stealth-patched headless Chrome, profile leasing, captcha detection
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
//! `BrowserPool` — a pool of reusable browser tabs backed by long-lived Chrome
//! sessions.
//!
//! The pool creates tabs **lazily** on first `acquire()` and recycles them on
//! `release()`. Tabs are returned to the ready queue with no CDP call — the
//! next caller's `navigate(url)` overwrites prior content, giving near-instant
//! reuse. Hard recycling (close + reopen) kicks in after `tab_max_uses`, and
//! idle eviction cleans up stale tabs.
//!
//! `warmup()` is **optional** — calling it pre-creates tabs for faster first
//! acquires, but the pool works correctly without it.

use std::{
    collections::VecDeque,
    env, fmt,
    sync::{
        Arc, Mutex as StdMutex, PoisonError,
        atomic::{AtomicUsize, Ordering},
    },
    time::{Duration, Instant},
};

use futures::future;
use tokio::{
    sync::{Mutex, Semaphore},
    task::JoinHandle,
    time::{sleep, timeout},
};

use crate::{
    error::{Result, VoidCrawlError},
    page::Page,
    session::BrowserSession,
};

/// Configuration for a [`BrowserPool`].
#[derive(Debug, Clone)]
pub struct PoolConfig {
    /// Number of Chrome processes (sessions) in the pool.
    pub browsers:             usize,
    /// Maximum concurrent tabs per browser session.
    pub tabs_per_browser:     usize,
    /// Close and reopen a tab after this many uses.
    pub tab_max_uses:         u32,
    /// Evict idle tabs after this many seconds.
    pub tab_max_idle_secs:    u64,
    /// Maximum seconds to wait for a tab in [`BrowserPool::acquire()`].
    ///
    /// When all tabs are checked out, `acquire()` blocks on the semaphore
    /// for at most this many seconds before returning
    /// [`VoidCrawlError::Timeout`].  `0` means wait indefinitely (the
    /// pre-v0.2 behaviour).
    pub acquire_timeout_secs: u64,
    /// Automatically run idle eviction in a background task.
    ///
    /// When `true` (the default), calling
    /// [`BrowserPool::spawn_eviction_task`] will start a Tokio task that
    /// calls [`BrowserPool::evict_idle`] every `tab_max_idle_secs / 2`
    /// seconds.  The task is cancelled when the handle returned by
    /// `spawn_eviction_task` is aborted (typically on pool close).
    pub auto_evict:           bool,
}

impl Default for PoolConfig {
    fn default() -> Self {
        Self {
            browsers:             1,
            tabs_per_browser:     4,
            tab_max_uses:         50,
            tab_max_idle_secs:    60,
            acquire_timeout_secs: 30,
            auto_evict:           true,
        }
    }
}

/// A tab checked out from the pool.
///
/// Holds the underlying [`Page`] plus bookkeeping metadata.
/// Return it to the pool via [`BrowserPool::release()`].
pub struct PooledTab {
    /// The CDP page / tab.
    pub page:               Page,
    /// How many times this tab has been used (incremented on release).
    pub use_count:          u32,
    /// When this tab was last returned to the ready queue.
    pub last_used:          Instant,
    /// Index into `BrowserPool::sessions` identifying which browser owns this
    /// tab.
    pub(crate) browser_idx: usize,
}

impl fmt::Debug for PooledTab {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("PooledTab")
            .field("page", &self.page)
            .field("use_count", &self.use_count)
            .field("last_used", &self.last_used)
            .field("browser_idx", &self.browser_idx)
            .finish()
    }
}

/// A pool of reusable browser tabs spread across one or more Chrome sessions.
///
/// Tabs are created lazily on first `acquire()`. Call
/// [`warmup()`](Self::warmup) to optionally pre-create tabs for faster first
/// acquires.
///
/// # Usage
///
/// ```rust,no_run
/// # async fn example() -> void_crawl_core::Result<()> {
/// use void_crawl_core::pool::BrowserPool;
///
/// let pool = BrowserPool::from_env().await?;
/// // warmup() is optional — tabs are created on demand
///
/// let tab = pool.acquire().await?;
/// tab.page.navigate("https://example.com").await?;
/// let html = tab.page.content().await?;
/// pool.release(tab).await;
///
/// pool.close().await?;
/// # Ok(())
/// # }
/// ```
pub struct BrowserPool {
    sessions:      Vec<BrowserSession>,
    ready:         Mutex<VecDeque<PooledTab>>,
    semaphore:     Arc<Semaphore>,
    config:        PoolConfig,
    /// Round-robin counter for distributing new tabs across sessions.
    next_session:  AtomicUsize,
    /// Background eviction task handle.  `StdMutex` (not tokio) because we
    /// only set/take the handle in sync contexts — no `.await` inside the lock.
    eviction_task: StdMutex<Option<JoinHandle<()>>>,
}

impl fmt::Debug for BrowserPool {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("BrowserPool")
            .field("config", &self.config)
            .field("sessions", &self.sessions.len())
            .finish_non_exhaustive()
    }
}

impl BrowserPool {
    /// Create a new pool from pre-built sessions and config.
    ///
    /// The pool starts with **no tabs** — they are created lazily on
    /// [`acquire()`](Self::acquire) or optionally pre-created via
    /// [`warmup()`](Self::warmup).
    pub fn new(config: PoolConfig, sessions: Vec<BrowserSession>) -> Self {
        let total_tabs = config.browsers * config.tabs_per_browser;
        Self {
            sessions,
            ready: Mutex::new(VecDeque::with_capacity(total_tabs)),
            // Permits = max concurrency. Tabs created lazily within this limit.
            semaphore: Arc::new(Semaphore::new(total_tabs)),
            config,
            next_session: AtomicUsize::new(0),
            eviction_task: StdMutex::new(None),
        }
    }

    /// Build a pool from environment variables.
    ///
    /// | Variable | Description | Default |
    /// |---|---|---|
    /// | `CHROME_WS_URLS` | Comma-separated `ws://` or `http://` URLs (connect mode) | — |
    /// | `BROWSER_COUNT` | Number of Chrome processes to launch | `1` |
    /// | `TABS_PER_BROWSER` | Max concurrent tabs per browser | `4` |
    /// | `TAB_MAX_USES` | Hard recycle threshold | `50` |
    /// | `TAB_MAX_IDLE_SECS` | Idle eviction timeout | `60` |
    /// | `CHROME_NO_SANDBOX` | Set to `"1"` to pass `--no-sandbox` | — |
    /// | `CHROME_HEADLESS` | Set to `"0"` for headful mode | `1` |
    /// | `ACQUIRE_TIMEOUT_SECS` | Max seconds to wait in acquire() | `30` |
    /// | `VIEWPORT_WIDTH` | Stealth viewport width | `1920` |
    /// | `VIEWPORT_HEIGHT` | Stealth viewport height | `1080` |
    /// | `CDP_PORT_BASE` | Pin Chrome's `--remote-debugging-port` for launched browsers. Browser `i` uses `base + i`. Unset = OS-assigned (recommended; can't conflict). | — |
    pub async fn from_env() -> Result<Self> {
        let tabs_per_browser: usize =
            env::var("TABS_PER_BROWSER").ok().and_then(|v| v.parse().ok()).unwrap_or(4);
        let tab_max_uses: u32 =
            env::var("TAB_MAX_USES").ok().and_then(|v| v.parse().ok()).unwrap_or(50);
        let tab_max_idle_secs: u64 =
            env::var("TAB_MAX_IDLE_SECS").ok().and_then(|v| v.parse().ok()).unwrap_or(60);
        let acquire_timeout_secs: u64 =
            env::var("ACQUIRE_TIMEOUT_SECS").ok().and_then(|v| v.parse().ok()).unwrap_or(30);
        let no_sandbox = env::var("CHROME_NO_SANDBOX").ok().is_some_and(|v| v == "1");
        let headless = env::var("CHROME_HEADLESS").ok().is_none_or(|v| v != "0");
        let viewport_width: Option<u32> =
            env::var("VIEWPORT_WIDTH").ok().and_then(|v| v.parse().ok());
        let viewport_height: Option<u32> =
            env::var("VIEWPORT_HEIGHT").ok().and_then(|v| v.parse().ok());
        // Leave `None` to use chromiumoxide's default (port 0 = OS-assigned),
        // which avoids every port-conflict failure mode. Only set this when
        // a firewall / container only exposes specific ports.
        let cdp_port_base: Option<u16> =
            env::var("CDP_PORT_BASE").ok().and_then(|v| v.parse().ok());

        let sessions = if let Ok(urls) = env::var("CHROME_WS_URLS") {
            // Connect mode: attach to pre-existing Chrome instances **in parallel**
            let futs: Vec<_> = urls
                .split(',')
                .map(str::trim)
                .filter(|u| !u.is_empty())
                .map(|url| BrowserSession::connect(url.to_string()))
                .collect();

            if futs.is_empty() {
                return Err(VoidCrawlError::Other(
                    "CHROME_WS_URLS is set but contains no valid URLs".into(),
                ));
            }

            let results = future::join_all(futs).await;
            results.into_iter().collect::<Result<Vec<_>>>()?
        } else {
            // Launch mode: start Chrome processes **in parallel**
            let browser_count: usize =
                env::var("BROWSER_COUNT").ok().and_then(|v| v.parse().ok()).unwrap_or(1);

            let futs: Vec<_> = (0..browser_count)
                .map(|i| {
                    let mut builder = if headless {
                        BrowserSession::builder().headless()
                    } else {
                        BrowserSession::builder().headful()
                    };
                    if no_sandbox {
                        builder = builder.no_sandbox();
                    }
                    if let (Some(w), Some(h)) = (viewport_width, viewport_height) {
                        builder = builder.viewport(w, h);
                    } else if let Some(w) = viewport_width {
                        builder = builder.viewport(w, 1080);
                    } else if let Some(h) = viewport_height {
                        builder = builder.viewport(1920, h);
                    }
                    // Browser N gets base+N so launching multiple browsers
                    // with a pinned base doesn't collide. `base + i` can
                    // overflow `u16::MAX`; clamp via `saturating_add` and
                    // let the OS reject it rather than wrap silently.
                    if let Some(base) = cdp_port_base {
                        builder =
                            builder.port(base.saturating_add(u16::try_from(i).unwrap_or(u16::MAX)));
                    }
                    builder.launch()
                })
                .collect();

            let results = future::join_all(futs).await;
            results.into_iter().collect::<Result<Vec<_>>>()?
        };

        let config = PoolConfig {
            browsers: sessions.len(),
            tabs_per_browser,
            tab_max_uses,
            tab_max_idle_secs,
            acquire_timeout_secs,
            auto_evict: true,
        };

        Ok(Self::new(config, sessions))
    }

    /// Pick the next session index (round-robin).
    fn next_browser_idx(&self) -> usize {
        if self.sessions.len() == 1 {
            return 0;
        }
        self.next_session.fetch_add(1, Ordering::Relaxed) % self.sessions.len()
    }

    /// Create a fresh tab on a round-robin browser session.
    async fn create_tab(&self) -> Result<PooledTab> {
        let idx = self.next_browser_idx();
        let page = self.sessions[idx].new_blank_page().await?;
        Ok(PooledTab { page, use_count: 0, last_used: Instant::now(), browser_idx: idx })
    }

    /// Optionally pre-open tabs across all sessions and fill the ready queue.
    ///
    /// Tabs are created **in parallel** across sessions, then inserted into
    /// the ready queue. Successful tabs are kept even when some fail; the
    /// first error (if any) is returned after all successful tabs are stored.
    ///
    /// This is **optional** — if not called, tabs are created lazily on
    /// first [`acquire()`](Self::acquire).
    pub async fn warmup(&self) -> Result<()> {
        // Build futures for all tabs across all sessions
        let mut futs = Vec::with_capacity(self.config.browsers * self.config.tabs_per_browser);
        for (idx, session) in self.sessions.iter().enumerate() {
            for _ in 0..self.config.tabs_per_browser {
                futs.push(async move {
                    let page = session.new_blank_page().await?;
                    Ok::<_, VoidCrawlError>(PooledTab {
                        page,
                        use_count: 0,
                        last_used: Instant::now(),
                        browser_idx: idx,
                    })
                });
            }
        }

        // Create all tabs in parallel
        let results = future::join_all(futs).await;

        // The semaphore tracks checked-out tabs, not ready-queue occupancy.
        // Tabs in the ready queue are "available" — they hold no permit.
        // Just push successful tabs directly; failed tabs are skipped and
        // their capacity remains available for lazy creation via acquire().
        let mut ready = self.ready.lock().await;
        let mut first_err: Option<VoidCrawlError> = None;
        for result in results {
            match result {
                Ok(tab) => ready.push_back(tab),
                Err(e) => {
                    if first_err.is_none() {
                        first_err = Some(e);
                    }
                }
            }
        }

        // If any tabs failed, report the first error but keep the
        // successfully created tabs in the pool.
        drop(ready);
        if let Some(e) = first_err { Err(e) } else { Ok(()) }
    }

    /// Check out a tab from the pool.
    ///
    /// If an idle tab is available, it is returned immediately. Otherwise,
    /// a new tab is created on demand (up to `tabs_per_browser * browsers`
    /// total). Blocks only when all tabs are currently in use.
    ///
    /// Tabs that have exceeded `tab_max_uses` are silently hard-recycled.
    ///
    /// The semaphore permit is returned on every error path so that
    /// failures never permanently shrink pool concurrency.
    pub async fn acquire(&self) -> Result<PooledTab> {
        self.acquire_timed().await.map(|(tab, _waited_ms)| tab)
    }

    /// Like [`acquire`](Self::acquire), but also returns the milliseconds
    /// spent blocked on the concurrency semaphore — the *pure queueing wait*,
    /// excluding any lazy tab-creation latency that happens after a permit is
    /// granted. A near-zero value means a slot was free immediately; a
    /// non-trivial value means the caller queued behind other in-flight work.
    /// Surfaced to MCP clients so an agent can tell when it has oversubscribed
    /// the pool and should throttle or cap a batch at `max_tabs`.
    pub async fn acquire_timed(&self) -> Result<(PooledTab, u64)> {
        let wait_start = Instant::now();
        let permit = if self.config.acquire_timeout_secs == 0 {
            // No timeout — wait indefinitely (legacy behaviour).
            self.semaphore
                .acquire()
                .await
                .map_err(|_| VoidCrawlError::Other("pool semaphore closed".into()))?
        } else {
            let deadline = Duration::from_secs(self.config.acquire_timeout_secs);
            match timeout(deadline, self.semaphore.acquire()).await {
                Ok(Ok(permit)) => permit,
                Ok(Err(_)) => {
                    return Err(VoidCrawlError::Other("pool semaphore closed".into()));
                }
                Err(_) => {
                    return Err(VoidCrawlError::Timeout(format!(
                        "pool.acquire() timed out after {}s — all {} tabs are checked out",
                        self.config.acquire_timeout_secs,
                        self.config.browsers * self.config.tabs_per_browser,
                    )));
                }
            }
        };
        // Pure semaphore queueing time — captured before the ready-queue pop
        // and any lazy `create_tab()` round-trip, so tab-creation latency is
        // never misreported as contention.
        let waited_ms = u64::try_from(wait_start.elapsed().as_millis()).unwrap_or(u64::MAX);
        // Don't auto-return the permit on drop — release() will add it back
        // on the success path. Error paths below must add_permits(1) manually.
        permit.forget();

        // Try the ready queue first (fast path: reuse an existing tab)
        let maybe_tab = {
            let mut ready = self.ready.lock().await;
            ready.pop_front()
        };

        let tab = match maybe_tab {
            Some(tab) => tab,
            // No idle tab — create one on demand (lazy growth)
            None => match self.create_tab().await {
                Ok(tab) => tab,
                Err(e) => {
                    self.semaphore.add_permits(1);
                    return Err(e);
                }
            },
        };

        // Hard recycle if this tab is worn out
        if tab.use_count >= self.config.tab_max_uses {
            let browser_idx = tab.browser_idx;
            let _ = tab.page.close().await;
            match self.sessions[browser_idx].new_blank_page().await {
                Ok(page) => {
                    return Ok((
                        PooledTab { page, use_count: 0, last_used: Instant::now(), browser_idx },
                        waited_ms,
                    ));
                }
                Err(e) => {
                    self.semaphore.add_permits(1);
                    return Err(e);
                }
            }
        }

        // No about:blank cleanup — the caller's navigate(url) will replace
        // the prior page content, and stealth scripts persist across navigations.
        // This saves 50-200ms of CDP round-trip per reused tab.
        Ok((tab, waited_ms))
    }

    /// Return a tab to the pool after use.
    ///
    /// Instant return — no CDP round-trip on the common path. The next caller's
    /// `navigate(url)` overwrites prior page content; stealth scripts persist
    /// across navigations.
    ///
    /// The one exception: if a download was armed on this tab and never
    /// completed (`arm_download` without a matching `wait`), we reset the CDP
    /// download behavior before recycling so the next caller doesn't inherit an
    /// `allowAndName` pointing at a since-deleted quarantine dir. This costs
    /// one CDP call only on the rare armed-but-abandoned path.
    pub async fn release(&self, mut tab: PooledTab) {
        tab.use_count += 1;
        tab.last_used = Instant::now();

        if tab.page.is_download_armed() {
            tab.page.reset_download_behavior().await;
        }

        self.ready.lock().await.push_back(tab);
        self.semaphore.add_permits(1);
    }

    /// Close idle tabs that have exceeded `tab_max_idle_secs` and open fresh
    /// replacements.
    ///
    /// Intended to be called periodically from a background tokio task.
    pub async fn evict_idle(&self) -> Result<()> {
        let max_idle = Duration::from_secs(self.config.tab_max_idle_secs);
        let now = Instant::now();

        // Partition the ready queue into keep vs. evict
        let to_evict: Vec<PooledTab> = {
            let mut ready = self.ready.lock().await;
            let mut keep = VecDeque::with_capacity(ready.len());
            let mut evict = Vec::new();

            while let Some(tab) = ready.pop_front() {
                if now.duration_since(tab.last_used) > max_idle {
                    evict.push(tab);
                } else {
                    keep.push_back(tab);
                }
            }
            *ready = keep;
            evict
        };

        // Close evicted tabs and create replacements in parallel.
        //
        // If the owning session is dead (handler exited), skip the
        // replacement — closing the old tab and failing to replace it
        // would permanently shrink the pool.
        let futs: Vec<_> = to_evict
            .into_iter()
            .map(|tab| {
                let browser_idx = tab.browser_idx;
                let session = &self.sessions[browser_idx];
                async move {
                    if !session.is_alive() {
                        // Session is dead — return the old tab unchanged so
                        // the pool doesn't lose capacity.
                        return Ok::<_, VoidCrawlError>(tab);
                    }
                    let _ = tab.page.close().await;
                    match session.new_blank_page().await {
                        Ok(page) => Ok(PooledTab {
                            page,
                            use_count: 0,
                            last_used: Instant::now(),
                            browser_idx,
                        }),
                        Err(e) => Err(e),
                    }
                }
            })
            .collect();

        let results = future::join_all(futs).await;
        let mut ready = self.ready.lock().await;
        let mut first_err: Option<VoidCrawlError> = None;
        for result in results {
            match result {
                Ok(tab) => ready.push_back(tab),
                // Replacement failed — the old tab is already closed, so
                // this slot is lost.  The semaphore still has its permit,
                // so acquire() can still create a tab on-demand if the
                // session recovers.
                Err(e) => {
                    if first_err.is_none() {
                        first_err = Some(e);
                    }
                }
            }
        }
        // Release the Mutex before returning, then surface the first error.
        drop(ready);
        if let Some(e) = first_err { Err(e) } else { Ok(()) }
    }

    /// Access the pool configuration.
    pub fn config(&self) -> &PoolConfig {
        &self.config
    }

    /// Free concurrency permits right now — how many more tabs could be
    /// acquired without queueing. `max_tabs - available_permits()` is the
    /// count currently checked out (in-flight fetches plus any held session
    /// tabs). A live snapshot an agent can read via `pool_status` to size a
    /// fan-out before submitting it.
    pub fn available_permits(&self) -> usize {
        self.semaphore.available_permits()
    }

    /// Start a background Tokio task that periodically calls
    /// [`evict_idle`](Self::evict_idle).
    ///
    /// **Idempotent** — if an eviction task is already running, this is a
    /// no-op.  The handle is stored inside the pool and cancelled
    /// automatically by [`close`](Self::close).  Call
    /// [`stop_eviction_task`](Self::stop_eviction_task) to cancel early.
    ///
    /// # Panics
    ///
    /// Must be called from within a Tokio runtime context.
    pub fn start_eviction_task(self: Arc<Self>) {
        // Recover from a poisoned mutex: if a previous panic occurred while
        // holding this lock, the inner value is still usable.
        let mut slot = self.eviction_task.lock().unwrap_or_else(PoisonError::into_inner);
        if slot.is_some() {
            return; // Already running — ignore duplicate call.
        }
        let pool = Arc::clone(&self);
        let interval = Duration::from_secs((self.config.tab_max_idle_secs / 2).max(1));
        let handle = tokio::spawn(async move {
            loop {
                sleep(interval).await;
                // Ignore errors — a single eviction failure (e.g. a tab
                // failing to close) should not kill the background task.
                let _ = pool.evict_idle().await;
            }
        });
        *slot = Some(handle);
    }

    /// Stop the background eviction task (if running).
    ///
    /// Called automatically by [`close`](Self::close).
    pub fn stop_eviction_task(&self) {
        let mut slot = self.eviction_task.lock().unwrap_or_else(PoisonError::into_inner);
        if let Some(task) = slot.take() {
            task.abort();
        }
    }

    /// Drain all tabs and close all browser sessions.
    ///
    /// Returns the first error encountered during tab or session teardown,
    /// but always attempts to close everything regardless of individual
    /// failures.
    pub async fn close(&self) -> Result<()> {
        // Stop the eviction task before closing so it doesn't race with tab
        // and session teardown.
        self.stop_eviction_task();
        // Drain the ready queue
        let tabs: Vec<PooledTab> = {
            let mut ready = self.ready.lock().await;
            ready.drain(..).collect()
        };

        let mut first_err: Option<VoidCrawlError> = None;

        // Close all tabs in parallel
        let tab_futs: Vec<_> = tabs.into_iter().map(|tab| tab.page.close()).collect();
        for result in future::join_all(tab_futs).await {
            if let Err(e) = result {
                if first_err.is_none() {
                    first_err = Some(e);
                }
            }
        }

        // Close all browser sessions in parallel
        let session_futs: Vec<_> = self.sessions.iter().map(BrowserSession::close).collect();
        for result in future::join_all(session_futs).await {
            if let Err(e) = result {
                if first_err.is_none() {
                    first_err = Some(e);
                }
            }
        }

        if let Some(e) = first_err { Err(e) } else { Ok(()) }
    }
}