Skip to main content

rustial_engine/io/
fetch_pool.rs

1//! Concurrent fetch pool with configurable concurrency and viewport-center
2//! priority ordering.
3//!
4//! # Purpose
5//!
6//! [`FetchPool`] sits between the engine (which decides *which* tiles to
7//! fetch) and the host-provided [`HttpClient`] (which does the actual I/O).
8//! It adds two things the raw client does not have:
9//!
10//! 1. **Concurrency limit** -- prevents flooding the network with hundreds
11//!    of requests at once when the user zooms out.
12//! 2. **Priority queue** -- tiles closest to the viewport center are sent
13//!    first, so the area the user is looking at loads before periphery.
14//!
15//! # Request lifecycle
16//!
17//! ```text
18//! enqueue(url, priority)       -- adds to priority queue
19//!    |
20//!    v
21//! +----------+  flush()   +------------+
22//! |  queue   | ---------> |  HttpClient |
23//! |  (heap)  |            | (in-flight) |
24//! +----------+            +------+------
25//!                                | poll()
26//!                                v
27//!                          completed results
28//! ```
29//!
30//! Call [`enqueue`](FetchPool::enqueue) to add requests, then
31//! [`flush`](FetchPool::flush) to dispatch up to the concurrency limit.
32//! [`poll`](FetchPool::poll) collects completed responses and
33//! automatically flushes freed slots.
34//!
35//! # Thread safety
36//!
37//! The pool is `Send + Sync`.  Internal state is protected by mutexes so
38//! it can be called from any thread.  Lock scopes are kept short to
39//! avoid contention.
40//!
41//! # Cancellation
42//!
43//! Call [`cancel`](FetchPool::cancel) to remove a queued (not yet sent)
44//! request.  Already in-flight requests cannot be cancelled at this level
45//! -- the `HttpClient` may or may not support that.
46
47use crate::io::{HttpClient, HttpRequest, HttpResponse};
48use std::cmp::Ordering;
49use std::collections::{BinaryHeap, HashSet};
50use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
51use std::sync::Mutex;
52
53// ---------------------------------------------------------------------------
54// Priority wrapper
55// ---------------------------------------------------------------------------
56
57/// A fetch request annotated with a priority score.
58///
59/// Lower `priority` values are dispatched first (nearest to viewport center).
60#[derive(Debug)]
61struct PrioritizedRequest {
62    request: HttpRequest,
63    /// Distance-like metric from the viewport center.
64    /// Lower = more important = dispatched sooner.
65    priority: f64,
66    sequence: u64,
67}
68
69impl PartialEq for PrioritizedRequest {
70    fn eq(&self, other: &Self) -> bool {
71        self.request.url == other.request.url
72    }
73}
74impl Eq for PrioritizedRequest {}
75
76impl PartialOrd for PrioritizedRequest {
77    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
78        Some(self.cmp(other))
79    }
80}
81
82impl Ord for PrioritizedRequest {
83    fn cmp(&self, other: &Self) -> Ordering {
84        // BinaryHeap is a max-heap.  We want lower priority values at the
85        // top, so we reverse the comparison.  NaN is treated as worst
86        // priority (pushed to the bottom).
87        other
88            .priority
89            .partial_cmp(&self.priority)
90            .unwrap_or(Ordering::Equal)
91            .then_with(|| other.sequence.cmp(&self.sequence))
92    }
93}
94
95// ---------------------------------------------------------------------------
96// FetchPool
97// ---------------------------------------------------------------------------
98
99/// A concurrency-limited, priority-ordered download scheduler.
100///
101/// Wraps an [`HttpClient`] and adds a bounded queue that dispatches
102/// the most important requests first.
103///
104/// # Example
105///
106/// ```rust,ignore
107/// use rustial_engine::{FetchPool, HttpClient};
108///
109/// let pool = FetchPool::new(my_http_client, 6);
110///
111/// // Batch enqueue -- no requests are sent yet.
112/// pool.enqueue("https://tile.example.com/10/512/340.png".into(), 0.0);
113/// pool.enqueue("https://tile.example.com/10/513/340.png".into(), 1.0);
114///
115/// // Dispatch the highest-priority requests up to the limit.
116/// pool.flush();
117///
118/// // Later, in the frame loop:
119/// let completed = pool.poll();  // also flushes freed slots
120/// for (url, result) in completed {
121///     // handle response
122/// }
123/// ```
124pub struct FetchPool {
125    /// The host-provided HTTP transport.
126    client: Box<dyn HttpClient>,
127
128    /// Maximum number of requests that may be in-flight simultaneously.
129    max_concurrent: usize,
130
131    /// Requests waiting to be dispatched, ordered by priority.
132    queue: Mutex<BinaryHeap<PrioritizedRequest>>,
133
134    /// URLs that are currently queued *or* in-flight (dedup set).
135    /// An entry is inserted on `enqueue` and removed when `poll`
136    /// returns the completed response, preventing duplicate requests
137    /// for the same URL.
138    known_urls: Mutex<HashSet<String>>,
139
140    /// URLs currently in-flight (sent to the HTTP client, awaiting response).
141    ///
142    /// Using a set of URLs instead of a plain counter eliminates
143    /// counting mismatches that arise when a deduplicating HTTP client
144    /// wrapper (e.g. [`SharedHttpClient`](crate::SharedHttpClient))
145    /// silently coalesces a re-sent URL with an existing in-flight
146    /// request.  With a set, `flush` inserts the URL (idempotent if
147    /// already present), `poll` removes it, and `force_cancel` removes
148    /// it immediately -- the set length is always accurate.
149    in_flight_urls: Mutex<HashSet<String>>,
150
151    /// URLs that were force-cancelled while already in-flight.
152    ///
153    /// When the HTTP response for such a URL arrives in [`poll`], it
154    /// is silently discarded (the URL was already removed from
155    /// `in_flight_urls` by `force_cancel`).  This set exists so that
156    /// `poll` can distinguish ghost responses from real completions.
157    cancelled_in_flight: Mutex<HashSet<String>>,
158
159    /// Monotonic insertion counter used to preserve FIFO ordering among
160    /// equal-priority requests.
161    sequence: AtomicU64,
162}
163
164impl FetchPool {
165    /// Create a new pool wrapping `client` with at most `max_concurrent`
166    /// simultaneous downloads.
167    ///
168    /// A `max_concurrent` of 0 is silently clamped to 1.
169    pub fn new(client: Box<dyn HttpClient>, max_concurrent: usize) -> Self {
170        Self {
171            client,
172            max_concurrent: max_concurrent.max(1),
173            queue: Mutex::new(BinaryHeap::new()),
174            known_urls: Mutex::new(HashSet::new()),
175            in_flight_urls: Mutex::new(HashSet::new()),
176            cancelled_in_flight: Mutex::new(HashSet::new()),
177            sequence: AtomicU64::new(0),
178        }
179    }
180
181    /// Add a URL to the priority queue.
182    ///
183    /// Lower `priority` = dispatched sooner.  Duplicate URLs (already
184    /// queued or in-flight) are silently ignored.
185    ///
186    /// The request is **not** sent until [`flush`](Self::flush) is called
187    /// (or implicitly via [`poll`](Self::poll)).  This allows the caller
188    /// to batch-enqueue a frame's worth of tiles before dispatching, so
189    /// the priority heap can sort them correctly.
190    pub fn enqueue(&self, request: HttpRequest, priority: f64) {
191        let url = request.url.clone();
192        let mut known = match self.known_urls.lock() {
193            Ok(u) => u,
194            Err(_) => return,
195        };
196        if !known.insert(url.clone()) {
197            return;
198        }
199        drop(known);
200
201        // If this URL was previously force-cancelled while in-flight but is
202        // now being re-enqueued, the eventual response should satisfy the new
203        // logical request rather than being discarded as a ghost.  Clear the
204        // ghost marker here so `poll` treats the next completion as real.
205        if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
206            cancelled.remove(&url);
207        }
208
209        if let Ok(mut queue) = self.queue.lock() {
210            queue.push(PrioritizedRequest {
211                request,
212                priority,
213                sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed),
214            });
215        }
216    }
217
218    /// Dispatch queued requests up to the concurrency limit.
219    ///
220    /// The highest-priority (lowest score) requests are sent first.
221    /// Call this once per frame after all [`enqueue`](Self::enqueue)
222    /// calls for that frame.
223    pub fn flush(&self) {
224        let mut in_flight = match self.in_flight_urls.lock() {
225            Ok(f) => f,
226            Err(_) => return,
227        };
228        let mut queue = match self.queue.lock() {
229            Ok(q) => q,
230            Err(_) => return,
231        };
232
233        while in_flight.len() < self.max_concurrent {
234            match queue.pop() {
235                Some(req) => {
236                    let url = req.request.url.clone();
237                    self.client.send(req.request);
238                    in_flight.insert(url);
239                }
240                None => break,
241            }
242        }
243    }
244
245    /// Remove a URL from the pending queue if it has not been sent yet.
246    ///
247    /// Returns `true` if the URL was found and removed.  Already in-flight
248    /// requests are not affected.
249    pub fn cancel(&self, url: &str) -> bool {
250        let in_queue = self.queue.lock().is_ok_and(|q| {
251            q.iter().any(|r| r.request.url == url)
252        });
253        if !in_queue {
254            return false;
255        }
256
257        // Remove from the dedup set.
258        if let Ok(mut known) = self.known_urls.lock() {
259            known.remove(url);
260        }
261
262        // Rebuild the heap without the cancelled URL.  O(n) but
263        // cancellation is infrequent.
264        if let Ok(mut queue) = self.queue.lock() {
265            let old: Vec<_> = queue.drain().collect();
266            for req in old {
267                if req.request.url != url {
268                    queue.push(req);
269                }
270            }
271        }
272        true
273    }
274
275    /// Remove a URL from the dedup set regardless of whether it is
276    /// queued or in-flight.
277    ///
278    /// If the URL is still in the queue it is also removed from the
279    /// heap.  If it is already in-flight, the concurrency slot is
280    /// reclaimed immediately so that new requests can be dispatched
281    /// without waiting for the ghost HTTP response to arrive.  The
282    /// URL is recorded in an internal set so that [`poll`](Self::poll)
283    /// does not double-decrement `in_flight` when the ghost response
284    /// eventually completes.
285    ///
286    /// This allows the same URL to be re-enqueued immediately, which
287    /// is essential when the tile manager cancels a pending tile and
288    /// then re-requests it on a subsequent frame.
289    pub fn force_cancel(&self, url: &str) {
290        // Check whether the URL is still in the queue (not yet sent).
291        let was_queued = self.queue.lock().is_ok_and(|q| {
292            q.iter().any(|r| r.request.url == url)
293        });
294
295        // Always remove from the dedup set so re-enqueue is possible.
296        if let Ok(mut known) = self.known_urls.lock() {
297            known.remove(url);
298        }
299
300        if was_queued {
301            // Remove from the heap -- the request was never sent.
302            if let Ok(mut queue) = self.queue.lock() {
303                let old: Vec<_> = queue.drain().collect();
304                for req in old {
305                    if req.request.url != url {
306                        queue.push(req);
307                    }
308                }
309            }
310        } else {
311            // The URL is in-flight.  Immediately reclaim the concurrency
312            // slot so new requests can be dispatched without waiting for
313            // the ghost HTTP response to arrive.
314            if let Ok(mut in_flight) = self.in_flight_urls.lock() {
315                in_flight.remove(url);
316            }
317            // Record the ghost so `poll` knows the eventual response is
318            // orphaned and should not be forwarded to the caller.
319            if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
320                cancelled.insert(url.to_owned());
321            }
322        }
323    }
324
325    /// Discard all queued (not yet sent) requests.
326    ///
327    /// In-flight requests are unaffected -- they will still appear in
328    /// future [`poll`](Self::poll) results.
329    pub fn clear_queue(&self) {
330        if let Ok(mut queue) = self.queue.lock() {
331            // Remove queued URLs from the known set so they can be
332            // re-enqueued later if needed.
333            if let Ok(mut known) = self.known_urls.lock() {
334                for req in queue.iter() {
335                    known.remove(&req.request.url);
336                }
337            }
338            queue.clear();
339        }
340    }
341
342    /// Number of requests waiting in the queue (not yet sent).
343    pub fn queued_count(&self) -> usize {
344        self.queue.lock().map(|q| q.len()).unwrap_or(0)
345    }
346
347    /// Maximum number of concurrent in-flight requests.
348    #[inline]
349    pub fn max_concurrent(&self) -> usize {
350        self.max_concurrent
351    }
352
353    /// Number of requests currently in-flight (sent, awaiting response).
354    pub fn in_flight_count(&self) -> usize {
355        self.in_flight_urls.lock().map(|g| g.len()).unwrap_or(0)
356    }
357
358    /// Number of URLs currently tracked by the dedup set.
359    pub fn known_count(&self) -> usize {
360        self.known_urls.lock().map(|k| k.len()).unwrap_or(0)
361    }
362
363    /// Number of URLs recorded as force-cancelled while in-flight.
364    pub fn cancelled_in_flight_count(&self) -> usize {
365        self.cancelled_in_flight
366            .lock()
367            .map(|set| set.len())
368            .unwrap_or(0)
369    }
370
371    /// Returns `true` if `url` is known to the pool (queued or in-flight).
372    pub fn is_known(&self, url: &str) -> bool {
373        self.known_urls.lock().is_ok_and(|k| k.contains(url))
374    }
375
376    /// Poll the underlying [`HttpClient`] for completed responses and
377    /// release their concurrency slots.
378    ///
379    /// Any freed slots are immediately filled from the priority queue
380    /// (implicit flush).
381    ///
382    /// Returns `(url, Result<HttpResponse, String>)` for each completed
383    /// request.
384    pub fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
385        let results = self.client.poll();
386
387        if !results.is_empty() {
388            // Clean up ghost responses (force-cancelled while in-flight)
389            // and release concurrency slots for real completions.
390            let mut cancelled = self.cancelled_in_flight.lock().ok();
391            let mut in_flight = self.in_flight_urls.lock().ok();
392
393            for (url, _) in &results {
394                // If this URL was force-cancelled, it is a ghost -- the
395                // slot was already freed in force_cancel.  Just clean up
396                // the tracking set.
397                if let Some(ref mut set) = cancelled {
398                    if set.remove(url.as_str()) {
399                        continue;
400                    }
401                }
402                // Real completion: free the concurrency slot.
403                if let Some(ref mut urls) = in_flight {
404                    urls.remove(url.as_str());
405                }
406            }
407            drop(cancelled);
408            drop(in_flight);
409
410            // Remove completed URLs from the dedup set.
411            if let Ok(mut known) = self.known_urls.lock() {
412                for (url, _) in &results {
413                    known.remove(url);
414                }
415            }
416        }
417
418        // Fill any freed slots from the queue.
419        self.flush();
420
421        results
422    }
423}
424
425// ---------------------------------------------------------------------------
426// Tests
427// ---------------------------------------------------------------------------
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use crate::io::shared_http_client::SharedHttpClient;
433    use std::sync::Arc;
434
435    // -- Helpers ----------------------------------------------------------
436
437    /// Mock that instantly completes every request on the next `poll`.
438    struct InstantMockClient {
439        sent: Mutex<Vec<String>>,
440    }
441
442    impl InstantMockClient {
443        fn new() -> Self {
444            Self {
445                sent: Mutex::new(Vec::new()),
446            }
447        }
448    }
449
450    impl HttpClient for InstantMockClient {
451        fn send(&self, request: HttpRequest) {
452            self.sent.lock().unwrap().push(request.url);
453        }
454
455        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
456            let sent = std::mem::take(&mut *self.sent.lock().unwrap());
457            sent.into_iter()
458                .map(|url| {
459                    (
460                        url,
461                        Ok(HttpResponse {
462                            status: 200,
463                            body: Vec::new(),
464                            headers: Vec::new(),
465                        }),
466                    )
467                })
468                .collect()
469        }
470    }
471
472    /// Mock that records send order but never completes.
473    struct DeferredMockClient {
474        sent: Mutex<Vec<String>>,
475    }
476
477    impl DeferredMockClient {
478        fn new() -> Self {
479            Self {
480                sent: Mutex::new(Vec::new()),
481            }
482        }
483    }
484
485    impl HttpClient for DeferredMockClient {
486        fn send(&self, request: HttpRequest) {
487            self.sent.lock().unwrap().push(request.url);
488        }
489
490        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
491            Vec::new()
492        }
493    }
494
495    /// Shared mock for inspecting send order after handing ownership
496    /// to FetchPool.
497    struct SharedMock(Arc<Mutex<Vec<String>>>);
498
499    impl HttpClient for SharedMock {
500        fn send(&self, request: HttpRequest) {
501            self.0.lock().unwrap().push(request.url);
502        }
503        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
504            Vec::new()
505        }
506    }
507
508    #[derive(Clone, Default)]
509    struct RecordingClient {
510        sent: Arc<Mutex<Vec<String>>>,
511        responses: Arc<Mutex<Vec<(String, Result<HttpResponse, String>)>>>,
512    }
513
514    impl RecordingClient {
515        fn sent_urls(&self) -> Vec<String> {
516            self.sent.lock().unwrap().clone()
517        }
518
519        fn complete(&self, url: &str) {
520            self.responses.lock().unwrap().push((
521                url.to_owned(),
522                Ok(HttpResponse {
523                    status: 200,
524                    body: Vec::new(),
525                    headers: Vec::new(),
526                }),
527            ));
528        }
529    }
530
531    impl HttpClient for RecordingClient {
532        fn send(&self, request: HttpRequest) {
533            self.sent.lock().unwrap().push(request.url);
534        }
535
536        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
537            std::mem::take(&mut *self.responses.lock().unwrap())
538        }
539    }
540
541    // -- Concurrency limit ------------------------------------------------
542
543    #[test]
544    fn respects_concurrency_limit() {
545        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
546        pool.enqueue(HttpRequest::get("a"), 1.0);
547        pool.enqueue(HttpRequest::get("b"), 2.0);
548        pool.enqueue(HttpRequest::get("c"), 3.0);
549        pool.flush();
550
551        // Only 2 sent, 1 remains queued.
552        assert_eq!(pool.in_flight_count(), 2);
553        assert_eq!(pool.queued_count(), 1);
554    }
555
556    #[test]
557    fn freed_slots_dispatch_queued() {
558        let pool = FetchPool::new(Box::new(InstantMockClient::new()), 1);
559        pool.enqueue(HttpRequest::get("a"), 1.0);
560        pool.enqueue(HttpRequest::get("b"), 2.0);
561        pool.flush();
562
563        // 'a' sent (lower priority value), 'b' queued.
564        assert_eq!(pool.in_flight_count(), 1);
565        assert_eq!(pool.queued_count(), 1);
566
567        // Poll completes 'a', implicit flush sends 'b'.
568        let results = pool.poll();
569        assert_eq!(results.len(), 1);
570        assert_eq!(results[0].0, "a");
571        assert_eq!(pool.queued_count(), 0);
572        assert_eq!(pool.in_flight_count(), 1); // 'b' now in-flight
573    }
574
575    // -- Priority ordering ------------------------------------------------
576
577    #[test]
578    fn priority_order_nearest_first() {
579        let sent = Arc::new(Mutex::new(Vec::new()));
580        let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
581
582        // Enqueue in non-priority order.
583        pool.enqueue(HttpRequest::get("far"), 100.0);
584        pool.enqueue(HttpRequest::get("near"), 1.0);
585        pool.enqueue(HttpRequest::get("mid"), 50.0);
586
587        // Flush dispatches all three; heap sorts by priority.
588        pool.flush();
589
590        let order = sent.lock().unwrap().clone();
591        assert_eq!(order.len(), 3);
592        assert_eq!(order[0], "near");
593        assert_eq!(order[1], "mid");
594        assert_eq!(order[2], "far");
595    }
596
597    #[test]
598    fn equal_priority_preserves_enqueue_order() {
599        let sent = Arc::new(Mutex::new(Vec::new()));
600        let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
601
602        pool.enqueue(HttpRequest::get("a"), 1.0);
603        pool.enqueue(HttpRequest::get("b"), 1.0);
604        pool.enqueue(HttpRequest::get("c"), 1.0);
605        pool.flush();
606
607        let order = sent.lock().unwrap().clone();
608        assert_eq!(order, vec!["a", "b", "c"]);
609    }
610
611    // -- Duplicate suppression --------------------------------------------
612
613    #[test]
614    fn duplicate_enqueue_ignored() {
615        let sent = Arc::new(Mutex::new(Vec::new()));
616        let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
617        pool.enqueue(HttpRequest::get("a"), 1.0);
618        pool.enqueue(HttpRequest::get("a"), 2.0); // duplicate -- silently dropped
619        pool.flush();
620
621        assert_eq!(pool.in_flight_count(), 1);
622        assert_eq!(sent.lock().unwrap().len(), 1);
623    }
624
625    #[test]
626    fn duplicate_suppressed_while_in_flight() {
627        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
628        pool.enqueue(HttpRequest::get("a"), 1.0);
629        pool.flush(); // 'a' is now in-flight
630
631        // Try to enqueue 'a' again while it is in-flight.
632        pool.enqueue(HttpRequest::get("a"), 2.0);
633        assert_eq!(pool.queued_count(), 0, "should not re-queue in-flight URL");
634    }
635
636    #[test]
637    fn can_re_enqueue_after_completion() {
638        let pool = FetchPool::new(Box::new(InstantMockClient::new()), 10);
639        pool.enqueue(HttpRequest::get("a"), 1.0);
640        pool.flush();
641
642        // Complete it.
643        let results = pool.poll();
644        assert_eq!(results.len(), 1);
645
646        // Now re-enqueue should work.
647        pool.enqueue(HttpRequest::get("a"), 1.0);
648        pool.flush();
649        assert_eq!(pool.in_flight_count(), 1);
650    }
651
652    // -- Cancel -----------------------------------------------------------
653
654    #[test]
655    fn cancel_removes_queued_request() {
656        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
657        pool.enqueue(HttpRequest::get("a"), 1.0);
658        pool.enqueue(HttpRequest::get("b"), 2.0);
659        pool.enqueue(HttpRequest::get("c"), 3.0);
660        pool.flush();
661
662        // 'a' sent (priority 1.0), 'b' and 'c' queued.
663        assert_eq!(pool.queued_count(), 2);
664        assert!(pool.cancel("b"));
665        assert_eq!(pool.queued_count(), 1);
666    }
667
668    #[test]
669    fn cancel_nonexistent_returns_false() {
670        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
671        assert!(!pool.cancel("nope"));
672    }
673
674    #[test]
675    fn cancel_in_flight_returns_false() {
676        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
677        pool.enqueue(HttpRequest::get("a"), 1.0);
678        pool.flush();
679        // 'a' is in-flight, not queued.
680        assert!(!pool.cancel("a"));
681    }
682
683    // -- Clear queue ------------------------------------------------------
684
685    #[test]
686    fn clear_queue_discards_pending() {
687        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
688        pool.enqueue(HttpRequest::get("a"), 1.0);
689        pool.enqueue(HttpRequest::get("b"), 2.0);
690        pool.enqueue(HttpRequest::get("c"), 3.0);
691        pool.flush();
692
693        assert_eq!(pool.queued_count(), 2);
694        pool.clear_queue();
695        assert_eq!(pool.queued_count(), 0);
696        // In-flight request is unaffected.
697        assert_eq!(pool.in_flight_count(), 1);
698    }
699
700    #[test]
701    fn cleared_urls_can_be_re_enqueued() {
702        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
703        pool.enqueue(HttpRequest::get("a"), 1.0);
704        pool.enqueue(HttpRequest::get("b"), 2.0);
705        pool.flush(); // sends 'a', 'b' queued
706        pool.clear_queue(); // drops 'b' from queue and known set
707
708        pool.enqueue(HttpRequest::get("b"), 2.0); // should succeed
709        assert_eq!(pool.queued_count(), 1);
710    }
711
712    // -- Zero-concurrency edge case ---------------------------------------
713
714    #[test]
715    fn zero_concurrency_clamped_to_one() {
716        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 0);
717        pool.enqueue(HttpRequest::get("a"), 1.0);
718        pool.flush();
719        assert_eq!(pool.in_flight_count(), 1);
720    }
721
722    // -- Empty pool -------------------------------------------------------
723
724    #[test]
725    fn poll_empty_returns_empty() {
726        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
727        assert!(pool.poll().is_empty());
728        assert_eq!(pool.in_flight_count(), 0);
729        assert_eq!(pool.queued_count(), 0);
730    }
731
732    #[test]
733    fn flush_empty_is_noop() {
734        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
735        pool.flush(); // should not panic
736        assert_eq!(pool.in_flight_count(), 0);
737    }
738
739    // -- Force-cancel ghost slot reclamation ------------------------------
740
741    #[test]
742    fn force_cancel_immediately_reclaims_in_flight_slot() {
743        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
744
745        pool.enqueue(HttpRequest::get("a"), 1.0);
746        pool.enqueue(HttpRequest::get("b"), 2.0);
747        pool.enqueue(HttpRequest::get("c"), 3.0);
748        pool.flush();
749
750        // 'a' and 'b' in-flight, 'c' queued.
751        assert_eq!(pool.in_flight_count(), 2);
752        assert_eq!(pool.queued_count(), 1);
753
754        // Force-cancel 'a' while it is in-flight.
755        pool.force_cancel("a");
756
757        // The slot should be reclaimed immediately.
758        assert_eq!(pool.in_flight_count(), 1, "ghost slot must be reclaimed immediately");
759        assert_eq!(pool.cancelled_in_flight_count(), 1);
760
761        // Flush should now dispatch 'c' into the freed slot.
762        pool.flush();
763        assert_eq!(pool.in_flight_count(), 2, "freed slot should accept queued request");
764        assert_eq!(pool.queued_count(), 0);
765    }
766
767    #[test]
768    fn ghost_response_does_not_double_decrement_in_flight() {
769        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
770
771        pool.enqueue(HttpRequest::get("a"), 1.0);
772        pool.enqueue(HttpRequest::get("b"), 2.0);
773        pool.flush();
774        assert_eq!(pool.in_flight_count(), 2);
775
776        // Force-cancel 'a': slot reclaimed immediately.
777        pool.force_cancel("a");
778        assert_eq!(pool.in_flight_count(), 1);
779        assert_eq!(pool.cancelled_in_flight_count(), 1);
780
781        // poll returns nothing (DeferredMockClient), so ghost persists.
782        let _ = pool.poll();
783        assert_eq!(pool.in_flight_count(), 1);
784        assert_eq!(pool.cancelled_in_flight_count(), 1, "ghost persists until response arrives");
785    }
786
787    #[test]
788    fn force_cancel_allows_re_enqueue_of_in_flight_url() {
789        let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
790        pool.enqueue(HttpRequest::get("a"), 1.0);
791        pool.flush();
792        assert_eq!(pool.in_flight_count(), 1);
793
794        pool.force_cancel("a");
795        assert_eq!(pool.in_flight_count(), 0);
796
797        // Should be able to re-enqueue immediately.
798        pool.enqueue(HttpRequest::get("a"), 1.0);
799        pool.flush();
800        assert_eq!(pool.in_flight_count(), 1);
801    }
802
803    #[test]
804    fn reenqueue_after_force_cancel_with_shared_dedup_completes_cleanly() {
805        let inner = RecordingClient::default();
806        let shared = SharedHttpClient::new(Box::new(inner.clone()));
807        let pool = FetchPool::new(Box::new(shared), 2);
808
809        pool.enqueue(HttpRequest::get("a"), 1.0);
810        pool.enqueue(HttpRequest::get("b"), 2.0);
811        pool.flush();
812        assert_eq!(pool.in_flight_count(), 2);
813        assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
814
815        // Cancel the in-flight request and immediately re-enqueue the same URL.
816        pool.force_cancel("a");
817        assert_eq!(pool.in_flight_count(), 1);
818        assert_eq!(pool.cancelled_in_flight_count(), 1);
819
820        pool.enqueue(HttpRequest::get("a"), 0.5);
821        pool.flush();
822
823        // SharedHttpClient should dedup the resend against the original
824        // in-flight request, so no second network send for "a" occurs.
825        assert_eq!(pool.in_flight_count(), 2);
826        assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
827        assert_eq!(pool.cancelled_in_flight_count(), 0, "re-enqueue should clear ghost marker");
828
829        // When the original response for "a" arrives, it must satisfy the new
830        // logical request rather than being discarded as a ghost.
831        inner.complete("a");
832        let results = pool.poll();
833        assert_eq!(results.len(), 1);
834        assert_eq!(results[0].0, "a");
835        assert_eq!(pool.in_flight_count(), 1, "completion should retire the re-enqueued URL");
836    }
837}