Skip to main content

rustial_engine/io/
fetch_pool.rs

1//! Concurrent fetch pool with configurable concurrency and viewport-center
2//! priority ordering.
3//!
4//! # Purpose
5//!
6//! [`FetchPool`] sits between the engine (which decides *which* tiles to
7//! fetch) and the host-provided [`HttpClient`] (which does the actual I/O).
8//! It adds two things the raw client does not have:
9//!
10//! 1. **Concurrency limit** -- prevents flooding the network with hundreds
11//!    of requests at once when the user zooms out.
12//! 2. **Priority queue** -- tiles closest to the viewport center are sent
13//!    first, so the area the user is looking at loads before periphery.
14//!
15//! # Request lifecycle
16//!
17//! ```text
18//! enqueue(url, priority)       -- adds to priority queue
19//!    |
20//!    v
21//! +----------+  flush()   +------------+
22//! |  queue   | ---------> |  HttpClient |
23//! |  (heap)  |            | (in-flight) |
24//! +----------+            +------+------
25//!                                | poll()
26//!                                v
27//!                          completed results
28//! ```
29//!
30//! Call [`enqueue`](FetchPool::enqueue) to add requests, then
31//! [`flush`](FetchPool::flush) to dispatch up to the concurrency limit.
32//! [`poll`](FetchPool::poll) collects completed responses and
33//! automatically flushes freed slots.
34//!
35//! # Thread safety
36//!
37//! The pool is `Send + Sync`.  Internal state is protected by mutexes so
38//! it can be called from any thread.  Lock scopes are kept short to
39//! avoid contention.
40//!
41//! # Cancellation
42//!
43//! Call [`cancel`](FetchPool::cancel) to remove a queued (not yet sent)
44//! request.  Already in-flight requests cannot be cancelled at this level
45//! -- the `HttpClient` may or may not support that.
46
47use crate::io::{HttpClient, HttpRequest, HttpResponse};
48use std::cmp::Ordering;
49use std::collections::{BinaryHeap, HashSet};
50use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
51use std::sync::Mutex;
52
53// ---------------------------------------------------------------------------
54// Priority wrapper
55// ---------------------------------------------------------------------------
56
57/// A fetch request annotated with a priority score.
58///
59/// Lower `priority` values are dispatched first (nearest to viewport center).
60#[derive(Debug)]
61struct PrioritizedRequest {
62    request: HttpRequest,
63    /// Distance-like metric from the viewport center.
64    /// Lower = more important = dispatched sooner.
65    priority: f64,
66    sequence: u64,
67}
68
69impl PartialEq for PrioritizedRequest {
70    fn eq(&self, other: &Self) -> bool {
71        self.request.url == other.request.url
72    }
73}
74impl Eq for PrioritizedRequest {}
75
76impl PartialOrd for PrioritizedRequest {
77    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
78        Some(self.cmp(other))
79    }
80}
81
82impl Ord for PrioritizedRequest {
83    fn cmp(&self, other: &Self) -> Ordering {
84        // BinaryHeap is a max-heap.  We want lower priority values at the
85        // top, so we reverse the comparison.  NaN is treated as worst
86        // priority (pushed to the bottom).
87        other
88            .priority
89            .partial_cmp(&self.priority)
90            .unwrap_or(Ordering::Equal)
91            .then_with(|| other.sequence.cmp(&self.sequence))
92    }
93}
94
95// ---------------------------------------------------------------------------
96// FetchPool
97// ---------------------------------------------------------------------------
98
99/// A concurrency-limited, priority-ordered download scheduler.
100///
101/// Wraps an [`HttpClient`] and adds a bounded queue that dispatches
102/// the most important requests first.
103///
104/// # Example
105///
106/// ```rust,ignore
107/// use rustial_engine::{FetchPool, HttpClient};
108///
109/// let pool = FetchPool::new(my_http_client, 6);
110///
111/// // Batch enqueue -- no requests are sent yet.
112/// pool.enqueue("https://tile.example.com/10/512/340.png".into(), 0.0);
113/// pool.enqueue("https://tile.example.com/10/513/340.png".into(), 1.0);
114///
115/// // Dispatch the highest-priority requests up to the limit.
116/// pool.flush();
117///
118/// // Later, in the frame loop:
119/// let completed = pool.poll();  // also flushes freed slots
120/// for (url, result) in completed {
121///     // handle response
122/// }
123/// ```
124pub struct FetchPool {
125    /// The host-provided HTTP transport.
126    client: Box<dyn HttpClient>,
127
128    /// Maximum number of requests that may be in-flight simultaneously.
129    max_concurrent: usize,
130
131    /// Requests waiting to be dispatched, ordered by priority.
132    queue: Mutex<BinaryHeap<PrioritizedRequest>>,
133
134    /// URLs that are currently queued *or* in-flight (dedup set).
135    /// An entry is inserted on `enqueue` and removed when `poll`
136    /// returns the completed response, preventing duplicate requests
137    /// for the same URL.
138    known_urls: Mutex<HashSet<String>>,
139
140    /// URLs currently in-flight (sent to the HTTP client, awaiting response).
141    ///
142    /// Using a set of URLs instead of a plain counter eliminates
143    /// counting mismatches that arise when a deduplicating HTTP client
144    /// wrapper (e.g. [`SharedHttpClient`](crate::SharedHttpClient))
145    /// silently coalesces a re-sent URL with an existing in-flight
146    /// request.  With a set, `flush` inserts the URL (idempotent if
147    /// already present), `poll` removes it, and `force_cancel` removes
148    /// it immediately -- the set length is always accurate.
149    in_flight_urls: Mutex<HashSet<String>>,
150
151    /// URLs that were force-cancelled while already in-flight.
152    ///
153    /// When the HTTP response for such a URL arrives in [`poll`], it
154    /// is silently discarded (the URL was already removed from
155    /// `in_flight_urls` by `force_cancel`).  This set exists so that
156    /// `poll` can distinguish ghost responses from real completions.
157    cancelled_in_flight: Mutex<HashSet<String>>,
158
159    /// Monotonic insertion counter used to preserve FIFO ordering among
160    /// equal-priority requests.
161    sequence: AtomicU64,
162}
163
164impl FetchPool {
165    /// Create a new pool wrapping `client` with at most `max_concurrent`
166    /// simultaneous downloads.
167    ///
168    /// A `max_concurrent` of 0 is silently clamped to 1.
169    pub fn new(client: Box<dyn HttpClient>, max_concurrent: usize) -> Self {
170        Self {
171            client,
172            max_concurrent: max_concurrent.max(1),
173            queue: Mutex::new(BinaryHeap::new()),
174            known_urls: Mutex::new(HashSet::new()),
175            in_flight_urls: Mutex::new(HashSet::new()),
176            cancelled_in_flight: Mutex::new(HashSet::new()),
177            sequence: AtomicU64::new(0),
178        }
179    }
180
181    /// Add a URL to the priority queue.
182    ///
183    /// Lower `priority` = dispatched sooner.  Duplicate URLs (already
184    /// queued or in-flight) are silently ignored.
185    ///
186    /// The request is **not** sent until [`flush`](Self::flush) is called
187    /// (or implicitly via [`poll`](Self::poll)).  This allows the caller
188    /// to batch-enqueue a frame's worth of tiles before dispatching, so
189    /// the priority heap can sort them correctly.
190    pub fn enqueue(&self, request: HttpRequest, priority: f64) {
191        let url = request.url.clone();
192        let mut known = match self.known_urls.lock() {
193            Ok(u) => u,
194            Err(_) => return,
195        };
196        if !known.insert(url.clone()) {
197            return;
198        }
199        drop(known);
200
201        // If this URL was previously force-cancelled while in-flight but is
202        // now being re-enqueued, the eventual response should satisfy the new
203        // logical request rather than being discarded as a ghost.  Clear the
204        // ghost marker here so `poll` treats the next completion as real.
205        if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
206            cancelled.remove(&url);
207        }
208
209        if let Ok(mut queue) = self.queue.lock() {
210            queue.push(PrioritizedRequest {
211                request,
212                priority,
213                sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed),
214            });
215        }
216    }
217
218    /// Dispatch queued requests up to the concurrency limit.
219    ///
220    /// The highest-priority (lowest score) requests are sent first.
221    /// Call this once per frame after all [`enqueue`](Self::enqueue)
222    /// calls for that frame.
223    pub fn flush(&self) {
224        let mut in_flight = match self.in_flight_urls.lock() {
225            Ok(f) => f,
226            Err(_) => return,
227        };
228        let mut queue = match self.queue.lock() {
229            Ok(q) => q,
230            Err(_) => return,
231        };
232
233        while in_flight.len() < self.max_concurrent {
234            match queue.pop() {
235                Some(req) => {
236                    let url = req.request.url.clone();
237                    self.client.send(req.request);
238                    in_flight.insert(url);
239                }
240                None => break,
241            }
242        }
243    }
244
245    /// Remove a URL from the pending queue if it has not been sent yet.
246    ///
247    /// Returns `true` if the URL was found and removed.  Already in-flight
248    /// requests are not affected.
249    pub fn cancel(&self, url: &str) -> bool {
250        let in_queue = self
251            .queue
252            .lock()
253            .is_ok_and(|q| q.iter().any(|r| r.request.url == url));
254        if !in_queue {
255            return false;
256        }
257
258        // Remove from the dedup set.
259        if let Ok(mut known) = self.known_urls.lock() {
260            known.remove(url);
261        }
262
263        // Rebuild the heap without the cancelled URL.  O(n) but
264        // cancellation is infrequent.
265        if let Ok(mut queue) = self.queue.lock() {
266            let old: Vec<_> = queue.drain().collect();
267            for req in old {
268                if req.request.url != url {
269                    queue.push(req);
270                }
271            }
272        }
273        true
274    }
275
276    /// Remove a URL from the dedup set regardless of whether it is
277    /// queued or in-flight.
278    ///
279    /// If the URL is still in the queue it is also removed from the
280    /// heap.  If it is already in-flight, the concurrency slot is
281    /// reclaimed immediately so that new requests can be dispatched
282    /// without waiting for the ghost HTTP response to arrive.  The
283    /// URL is recorded in an internal set so that [`poll`](Self::poll)
284    /// does not double-decrement `in_flight` when the ghost response
285    /// eventually completes.
286    ///
287    /// This allows the same URL to be re-enqueued immediately, which
288    /// is essential when the tile manager cancels a pending tile and
289    /// then re-requests it on a subsequent frame.
290    pub fn force_cancel(&self, url: &str) {
291        // Check whether the URL is still in the queue (not yet sent).
292        let was_queued = self
293            .queue
294            .lock()
295            .is_ok_and(|q| q.iter().any(|r| r.request.url == url));
296
297        // Always remove from the dedup set so re-enqueue is possible.
298        if let Ok(mut known) = self.known_urls.lock() {
299            known.remove(url);
300        }
301
302        if was_queued {
303            // Remove from the heap -- the request was never sent.
304            if let Ok(mut queue) = self.queue.lock() {
305                let old: Vec<_> = queue.drain().collect();
306                for req in old {
307                    if req.request.url != url {
308                        queue.push(req);
309                    }
310                }
311            }
312        } else {
313            // The URL is in-flight.  Immediately reclaim the concurrency
314            // slot so new requests can be dispatched without waiting for
315            // the ghost HTTP response to arrive.
316            if let Ok(mut in_flight) = self.in_flight_urls.lock() {
317                in_flight.remove(url);
318            }
319            // Record the ghost so `poll` knows the eventual response is
320            // orphaned and should not be forwarded to the caller.
321            if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
322                cancelled.insert(url.to_owned());
323            }
324        }
325    }
326
327    /// Discard all queued (not yet sent) requests.
328    ///
329    /// In-flight requests are unaffected -- they will still appear in
330    /// future [`poll`](Self::poll) results.
331    pub fn clear_queue(&self) {
332        if let Ok(mut queue) = self.queue.lock() {
333            // Remove queued URLs from the known set so they can be
334            // re-enqueued later if needed.
335            if let Ok(mut known) = self.known_urls.lock() {
336                for req in queue.iter() {
337                    known.remove(&req.request.url);
338                }
339            }
340            queue.clear();
341        }
342    }
343
344    /// Number of requests waiting in the queue (not yet sent).
345    pub fn queued_count(&self) -> usize {
346        self.queue.lock().map(|q| q.len()).unwrap_or(0)
347    }
348
349    /// Maximum number of concurrent in-flight requests.
350    #[inline]
351    pub fn max_concurrent(&self) -> usize {
352        self.max_concurrent
353    }
354
355    /// Number of requests currently in-flight (sent, awaiting response).
356    pub fn in_flight_count(&self) -> usize {
357        self.in_flight_urls.lock().map(|g| g.len()).unwrap_or(0)
358    }
359
360    /// Number of URLs currently tracked by the dedup set.
361    pub fn known_count(&self) -> usize {
362        self.known_urls.lock().map(|k| k.len()).unwrap_or(0)
363    }
364
365    /// Number of URLs recorded as force-cancelled while in-flight.
366    pub fn cancelled_in_flight_count(&self) -> usize {
367        self.cancelled_in_flight
368            .lock()
369            .map(|set| set.len())
370            .unwrap_or(0)
371    }
372
373    /// Returns `true` if `url` is known to the pool (queued or in-flight).
374    pub fn is_known(&self, url: &str) -> bool {
375        self.known_urls.lock().is_ok_and(|k| k.contains(url))
376    }
377
378    /// Poll the underlying [`HttpClient`] for completed responses and
379    /// release their concurrency slots.
380    ///
381    /// Any freed slots are immediately filled from the priority queue
382    /// (implicit flush).
383    ///
384    /// Returns `(url, Result<HttpResponse, String>)` for each completed
385    /// request.
386    pub fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
387        let results = self.client.poll();
388
389        if !results.is_empty() {
390            // Clean up ghost responses (force-cancelled while in-flight)
391            // and release concurrency slots for real completions.
392            let mut cancelled = self.cancelled_in_flight.lock().ok();
393            let mut in_flight = self.in_flight_urls.lock().ok();
394
395            for (url, _) in &results {
396                // If this URL was force-cancelled, it is a ghost -- the
397                // slot was already freed in force_cancel.  Just clean up
398                // the tracking set.
399                if let Some(ref mut set) = cancelled {
400                    if set.remove(url.as_str()) {
401                        continue;
402                    }
403                }
404                // Real completion: free the concurrency slot.
405                if let Some(ref mut urls) = in_flight {
406                    urls.remove(url.as_str());
407                }
408            }
409            drop(cancelled);
410            drop(in_flight);
411
412            // Remove completed URLs from the dedup set.
413            if let Ok(mut known) = self.known_urls.lock() {
414                for (url, _) in &results {
415                    known.remove(url);
416                }
417            }
418        }
419
420        // Fill any freed slots from the queue.
421        self.flush();
422
423        results
424    }
425}
426
427// ---------------------------------------------------------------------------
428// Tests
429// ---------------------------------------------------------------------------
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use crate::io::shared_http_client::SharedHttpClient;
435    use std::sync::Arc;
436
437    // -- Helpers ----------------------------------------------------------
438
439    /// Mock that instantly completes every request on the next `poll`.
440    struct InstantMockClient {
441        sent: Mutex<Vec<String>>,
442    }
443
444    impl InstantMockClient {
445        fn new() -> Self {
446            Self {
447                sent: Mutex::new(Vec::new()),
448            }
449        }
450    }
451
452    impl HttpClient for InstantMockClient {
453        fn send(&self, request: HttpRequest) {
454            self.sent.lock().unwrap().push(request.url);
455        }
456
457        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
458            let sent = std::mem::take(&mut *self.sent.lock().unwrap());
459            sent.into_iter()
460                .map(|url| {
461                    (
462                        url,
463                        Ok(HttpResponse {
464                            status: 200,
465                            body: Vec::new(),
466                            headers: Vec::new(),
467                        }),
468                    )
469                })
470                .collect()
471        }
472    }
473
474    /// Mock that records send order but never completes.
475    struct DeferredMockClient {
476        sent: Mutex<Vec<String>>,
477    }
478
479    impl DeferredMockClient {
480        fn new() -> Self {
481            Self {
482                sent: Mutex::new(Vec::new()),
483            }
484        }
485    }
486
487    impl HttpClient for DeferredMockClient {
488        fn send(&self, request: HttpRequest) {
489            self.sent.lock().unwrap().push(request.url);
490        }
491
492        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
493            Vec::new()
494        }
495    }
496
497    /// Shared mock for inspecting send order after handing ownership
498    /// to FetchPool.
499    struct SharedMock(Arc<Mutex<Vec<String>>>);
500
501    impl HttpClient for SharedMock {
502        fn send(&self, request: HttpRequest) {
503            self.0.lock().unwrap().push(request.url);
504        }
505        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
506            Vec::new()
507        }
508    }
509
510    #[derive(Clone, Default)]
511    #[allow(clippy::type_complexity)]
512    struct RecordingClient {
513        sent: Arc<Mutex<Vec<String>>>,
514        responses: Arc<Mutex<Vec<(String, Result<HttpResponse, String>)>>>,
515    }
516
517    impl RecordingClient {
518        fn sent_urls(&self) -> Vec<String> {
519            self.sent.lock().unwrap().clone()
520        }
521
522        fn complete(&self, url: &str) {
523            self.responses.lock().unwrap().push((
524                url.to_owned(),
525                Ok(HttpResponse {
526                    status: 200,
527                    body: Vec::new(),
528                    headers: Vec::new(),
529                }),
530            ));
531        }
532    }
533
534    impl HttpClient for RecordingClient {
535        fn send(&self, request: HttpRequest) {
536            self.sent.lock().unwrap().push(request.url);
537        }
538
539        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
540            std::mem::take(&mut *self.responses.lock().unwrap())
541        }
542    }
543
544    // -- Concurrency limit ------------------------------------------------
545
546    #[test]
547    fn respects_concurrency_limit() {
548        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
549        pool.enqueue(HttpRequest::get("a"), 1.0);
550        pool.enqueue(HttpRequest::get("b"), 2.0);
551        pool.enqueue(HttpRequest::get("c"), 3.0);
552        pool.flush();
553
554        // Only 2 sent, 1 remains queued.
555        assert_eq!(pool.in_flight_count(), 2);
556        assert_eq!(pool.queued_count(), 1);
557    }
558
559    #[test]
560    fn freed_slots_dispatch_queued() {
561        let pool = FetchPool::new(Box::new(InstantMockClient::new()), 1);
562        pool.enqueue(HttpRequest::get("a"), 1.0);
563        pool.enqueue(HttpRequest::get("b"), 2.0);
564        pool.flush();
565
566        // 'a' sent (lower priority value), 'b' queued.
567        assert_eq!(pool.in_flight_count(), 1);
568        assert_eq!(pool.queued_count(), 1);
569
570        // Poll completes 'a', implicit flush sends 'b'.
571        let results = pool.poll();
572        assert_eq!(results.len(), 1);
573        assert_eq!(results[0].0, "a");
574        assert_eq!(pool.queued_count(), 0);
575        assert_eq!(pool.in_flight_count(), 1); // 'b' now in-flight
576    }
577
578    // -- Priority ordering ------------------------------------------------
579
580    #[test]
581    fn priority_order_nearest_first() {
582        let sent = Arc::new(Mutex::new(Vec::new()));
583        let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
584
585        // Enqueue in non-priority order.
586        pool.enqueue(HttpRequest::get("far"), 100.0);
587        pool.enqueue(HttpRequest::get("near"), 1.0);
588        pool.enqueue(HttpRequest::get("mid"), 50.0);
589
590        // Flush dispatches all three; heap sorts by priority.
591        pool.flush();
592
593        let order = sent.lock().unwrap().clone();
594        assert_eq!(order.len(), 3);
595        assert_eq!(order[0], "near");
596        assert_eq!(order[1], "mid");
597        assert_eq!(order[2], "far");
598    }
599
600    #[test]
601    fn equal_priority_preserves_enqueue_order() {
602        let sent = Arc::new(Mutex::new(Vec::new()));
603        let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
604
605        pool.enqueue(HttpRequest::get("a"), 1.0);
606        pool.enqueue(HttpRequest::get("b"), 1.0);
607        pool.enqueue(HttpRequest::get("c"), 1.0);
608        pool.flush();
609
610        let order = sent.lock().unwrap().clone();
611        assert_eq!(order, vec!["a", "b", "c"]);
612    }
613
614    // -- Duplicate suppression --------------------------------------------
615
616    #[test]
617    fn duplicate_enqueue_ignored() {
618        let sent = Arc::new(Mutex::new(Vec::new()));
619        let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
620        pool.enqueue(HttpRequest::get("a"), 1.0);
621        pool.enqueue(HttpRequest::get("a"), 2.0); // duplicate -- silently dropped
622        pool.flush();
623
624        assert_eq!(pool.in_flight_count(), 1);
625        assert_eq!(sent.lock().unwrap().len(), 1);
626    }
627
628    #[test]
629    fn duplicate_suppressed_while_in_flight() {
630        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
631        pool.enqueue(HttpRequest::get("a"), 1.0);
632        pool.flush(); // 'a' is now in-flight
633
634        // Try to enqueue 'a' again while it is in-flight.
635        pool.enqueue(HttpRequest::get("a"), 2.0);
636        assert_eq!(pool.queued_count(), 0, "should not re-queue in-flight URL");
637    }
638
639    #[test]
640    fn can_re_enqueue_after_completion() {
641        let pool = FetchPool::new(Box::new(InstantMockClient::new()), 10);
642        pool.enqueue(HttpRequest::get("a"), 1.0);
643        pool.flush();
644
645        // Complete it.
646        let results = pool.poll();
647        assert_eq!(results.len(), 1);
648
649        // Now re-enqueue should work.
650        pool.enqueue(HttpRequest::get("a"), 1.0);
651        pool.flush();
652        assert_eq!(pool.in_flight_count(), 1);
653    }
654
655    // -- Cancel -----------------------------------------------------------
656
657    #[test]
658    fn cancel_removes_queued_request() {
659        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
660        pool.enqueue(HttpRequest::get("a"), 1.0);
661        pool.enqueue(HttpRequest::get("b"), 2.0);
662        pool.enqueue(HttpRequest::get("c"), 3.0);
663        pool.flush();
664
665        // 'a' sent (priority 1.0), 'b' and 'c' queued.
666        assert_eq!(pool.queued_count(), 2);
667        assert!(pool.cancel("b"));
668        assert_eq!(pool.queued_count(), 1);
669    }
670
671    #[test]
672    fn cancel_nonexistent_returns_false() {
673        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
674        assert!(!pool.cancel("nope"));
675    }
676
677    #[test]
678    fn cancel_in_flight_returns_false() {
679        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
680        pool.enqueue(HttpRequest::get("a"), 1.0);
681        pool.flush();
682        // 'a' is in-flight, not queued.
683        assert!(!pool.cancel("a"));
684    }
685
686    // -- Clear queue ------------------------------------------------------
687
688    #[test]
689    fn clear_queue_discards_pending() {
690        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
691        pool.enqueue(HttpRequest::get("a"), 1.0);
692        pool.enqueue(HttpRequest::get("b"), 2.0);
693        pool.enqueue(HttpRequest::get("c"), 3.0);
694        pool.flush();
695
696        assert_eq!(pool.queued_count(), 2);
697        pool.clear_queue();
698        assert_eq!(pool.queued_count(), 0);
699        // In-flight request is unaffected.
700        assert_eq!(pool.in_flight_count(), 1);
701    }
702
703    #[test]
704    fn cleared_urls_can_be_re_enqueued() {
705        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
706        pool.enqueue(HttpRequest::get("a"), 1.0);
707        pool.enqueue(HttpRequest::get("b"), 2.0);
708        pool.flush(); // sends 'a', 'b' queued
709        pool.clear_queue(); // drops 'b' from queue and known set
710
711        pool.enqueue(HttpRequest::get("b"), 2.0); // should succeed
712        assert_eq!(pool.queued_count(), 1);
713    }
714
715    // -- Zero-concurrency edge case ---------------------------------------
716
717    #[test]
718    fn zero_concurrency_clamped_to_one() {
719        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 0);
720        pool.enqueue(HttpRequest::get("a"), 1.0);
721        pool.flush();
722        assert_eq!(pool.in_flight_count(), 1);
723    }
724
725    // -- Empty pool -------------------------------------------------------
726
727    #[test]
728    fn poll_empty_returns_empty() {
729        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
730        assert!(pool.poll().is_empty());
731        assert_eq!(pool.in_flight_count(), 0);
732        assert_eq!(pool.queued_count(), 0);
733    }
734
735    #[test]
736    fn flush_empty_is_noop() {
737        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
738        pool.flush(); // should not panic
739        assert_eq!(pool.in_flight_count(), 0);
740    }
741
742    // -- Force-cancel ghost slot reclamation ------------------------------
743
744    #[test]
745    fn force_cancel_immediately_reclaims_in_flight_slot() {
746        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
747
748        pool.enqueue(HttpRequest::get("a"), 1.0);
749        pool.enqueue(HttpRequest::get("b"), 2.0);
750        pool.enqueue(HttpRequest::get("c"), 3.0);
751        pool.flush();
752
753        // 'a' and 'b' in-flight, 'c' queued.
754        assert_eq!(pool.in_flight_count(), 2);
755        assert_eq!(pool.queued_count(), 1);
756
757        // Force-cancel 'a' while it is in-flight.
758        pool.force_cancel("a");
759
760        // The slot should be reclaimed immediately.
761        assert_eq!(
762            pool.in_flight_count(),
763            1,
764            "ghost slot must be reclaimed immediately"
765        );
766        assert_eq!(pool.cancelled_in_flight_count(), 1);
767
768        // Flush should now dispatch 'c' into the freed slot.
769        pool.flush();
770        assert_eq!(
771            pool.in_flight_count(),
772            2,
773            "freed slot should accept queued request"
774        );
775        assert_eq!(pool.queued_count(), 0);
776    }
777
778    #[test]
779    fn ghost_response_does_not_double_decrement_in_flight() {
780        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
781
782        pool.enqueue(HttpRequest::get("a"), 1.0);
783        pool.enqueue(HttpRequest::get("b"), 2.0);
784        pool.flush();
785        assert_eq!(pool.in_flight_count(), 2);
786
787        // Force-cancel 'a': slot reclaimed immediately.
788        pool.force_cancel("a");
789        assert_eq!(pool.in_flight_count(), 1);
790        assert_eq!(pool.cancelled_in_flight_count(), 1);
791
792        // poll returns nothing (DeferredMockClient), so ghost persists.
793        let _ = pool.poll();
794        assert_eq!(pool.in_flight_count(), 1);
795        assert_eq!(
796            pool.cancelled_in_flight_count(),
797            1,
798            "ghost persists until response arrives"
799        );
800    }
801
802    #[test]
803    fn force_cancel_allows_re_enqueue_of_in_flight_url() {
804        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
805        pool.enqueue(HttpRequest::get("a"), 1.0);
806        pool.flush();
807        assert_eq!(pool.in_flight_count(), 1);
808
809        pool.force_cancel("a");
810        assert_eq!(pool.in_flight_count(), 0);
811
812        // Should be able to re-enqueue immediately.
813        pool.enqueue(HttpRequest::get("a"), 1.0);
814        pool.flush();
815        assert_eq!(pool.in_flight_count(), 1);
816    }
817
818    #[test]
819    fn reenqueue_after_force_cancel_with_shared_dedup_completes_cleanly() {
820        let inner = RecordingClient::default();
821        let shared = SharedHttpClient::new(Box::new(inner.clone()));
822        let pool = FetchPool::new(Box::new(shared), 2);
823
824        pool.enqueue(HttpRequest::get("a"), 1.0);
825        pool.enqueue(HttpRequest::get("b"), 2.0);
826        pool.flush();
827        assert_eq!(pool.in_flight_count(), 2);
828        assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
829
830        // Cancel the in-flight request and immediately re-enqueue the same URL.
831        pool.force_cancel("a");
832        assert_eq!(pool.in_flight_count(), 1);
833        assert_eq!(pool.cancelled_in_flight_count(), 1);
834
835        pool.enqueue(HttpRequest::get("a"), 0.5);
836        pool.flush();
837
838        // SharedHttpClient should dedup the resend against the original
839        // in-flight request, so no second network send for "a" occurs.
840        assert_eq!(pool.in_flight_count(), 2);
841        assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
842        assert_eq!(
843            pool.cancelled_in_flight_count(),
844            0,
845            "re-enqueue should clear ghost marker"
846        );
847
848        // When the original response for "a" arrives, it must satisfy the new
849        // logical request rather than being discarded as a ghost.
850        inner.complete("a");
851        let results = pool.poll();
852        assert_eq!(results.len(), 1);
853        assert_eq!(results[0].0, "a");
854        assert_eq!(
855            pool.in_flight_count(),
856            1,
857            "completion should retire the re-enqueued URL"
858        );
859    }
860}