rustial_engine/io/fetch_pool.rs
1//! Concurrent fetch pool with configurable concurrency and viewport-center
2//! priority ordering.
3//!
4//! # Purpose
5//!
6//! [`FetchPool`] sits between the engine (which decides *which* tiles to
7//! fetch) and the host-provided [`HttpClient`] (which does the actual I/O).
8//! It adds two things the raw client does not have:
9//!
10//! 1. **Concurrency limit** -- prevents flooding the network with hundreds
11//! of requests at once when the user zooms out.
12//! 2. **Priority queue** -- tiles closest to the viewport center are sent
13//! first, so the area the user is looking at loads before periphery.
14//!
15//! # Request lifecycle
16//!
17//! ```text
18//! enqueue(url, priority) -- adds to priority queue
19//! |
20//! v
21//! +----------+ flush() +------------+
22//! | queue | ---------> | HttpClient |
23//! | (heap) | | (in-flight) |
24//! +----------+ +------+------
25//! | poll()
26//! v
27//! completed results
28//! ```
29//!
30//! Call [`enqueue`](FetchPool::enqueue) to add requests, then
31//! [`flush`](FetchPool::flush) to dispatch up to the concurrency limit.
32//! [`poll`](FetchPool::poll) collects completed responses and
33//! automatically flushes freed slots.
34//!
35//! # Thread safety
36//!
37//! The pool is `Send + Sync`. Internal state is protected by mutexes so
38//! it can be called from any thread. Lock scopes are kept short to
39//! avoid contention.
40//!
41//! # Cancellation
42//!
43//! Call [`cancel`](FetchPool::cancel) to remove a queued (not yet sent)
44//! request. Already in-flight requests cannot be cancelled at this level
45//! -- the `HttpClient` may or may not support that.
46
47use crate::io::{HttpClient, HttpRequest, HttpResponse};
48use std::cmp::Ordering;
49use std::collections::{BinaryHeap, HashSet};
50use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
51use std::sync::Mutex;
52
53// ---------------------------------------------------------------------------
54// Priority wrapper
55// ---------------------------------------------------------------------------
56
57/// A fetch request annotated with a priority score.
58///
59/// Lower `priority` values are dispatched first (nearest to viewport center).
60#[derive(Debug)]
61struct PrioritizedRequest {
62 request: HttpRequest,
63 /// Distance-like metric from the viewport center.
64 /// Lower = more important = dispatched sooner.
65 priority: f64,
66 sequence: u64,
67}
68
69impl PartialEq for PrioritizedRequest {
70 fn eq(&self, other: &Self) -> bool {
71 self.request.url == other.request.url
72 }
73}
74impl Eq for PrioritizedRequest {}
75
76impl PartialOrd for PrioritizedRequest {
77 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
78 Some(self.cmp(other))
79 }
80}
81
82impl Ord for PrioritizedRequest {
83 fn cmp(&self, other: &Self) -> Ordering {
84 // BinaryHeap is a max-heap. We want lower priority values at the
85 // top, so we reverse the comparison. NaN is treated as worst
86 // priority (pushed to the bottom).
87 other
88 .priority
89 .partial_cmp(&self.priority)
90 .unwrap_or(Ordering::Equal)
91 .then_with(|| other.sequence.cmp(&self.sequence))
92 }
93}
94
95// ---------------------------------------------------------------------------
96// FetchPool
97// ---------------------------------------------------------------------------
98
99/// A concurrency-limited, priority-ordered download scheduler.
100///
101/// Wraps an [`HttpClient`] and adds a bounded queue that dispatches
102/// the most important requests first.
103///
104/// # Example
105///
106/// ```rust,ignore
107/// use rustial_engine::{FetchPool, HttpClient};
108///
109/// let pool = FetchPool::new(my_http_client, 6);
110///
111/// // Batch enqueue -- no requests are sent yet.
112/// pool.enqueue("https://tile.example.com/10/512/340.png".into(), 0.0);
113/// pool.enqueue("https://tile.example.com/10/513/340.png".into(), 1.0);
114///
115/// // Dispatch the highest-priority requests up to the limit.
116/// pool.flush();
117///
118/// // Later, in the frame loop:
119/// let completed = pool.poll(); // also flushes freed slots
120/// for (url, result) in completed {
121/// // handle response
122/// }
123/// ```
124pub struct FetchPool {
125 /// The host-provided HTTP transport.
126 client: Box<dyn HttpClient>,
127
128 /// Maximum number of requests that may be in-flight simultaneously.
129 max_concurrent: usize,
130
131 /// Requests waiting to be dispatched, ordered by priority.
132 queue: Mutex<BinaryHeap<PrioritizedRequest>>,
133
134 /// URLs that are currently queued *or* in-flight (dedup set).
135 /// An entry is inserted on `enqueue` and removed when `poll`
136 /// returns the completed response, preventing duplicate requests
137 /// for the same URL.
138 known_urls: Mutex<HashSet<String>>,
139
140 /// URLs currently in-flight (sent to the HTTP client, awaiting response).
141 ///
142 /// Using a set of URLs instead of a plain counter eliminates
143 /// counting mismatches that arise when a deduplicating HTTP client
144 /// wrapper (e.g. [`SharedHttpClient`](crate::SharedHttpClient))
145 /// silently coalesces a re-sent URL with an existing in-flight
146 /// request. With a set, `flush` inserts the URL (idempotent if
147 /// already present), `poll` removes it, and `force_cancel` removes
148 /// it immediately -- the set length is always accurate.
149 in_flight_urls: Mutex<HashSet<String>>,
150
151 /// URLs that were force-cancelled while already in-flight.
152 ///
153 /// When the HTTP response for such a URL arrives in [`poll`], it
154 /// is silently discarded (the URL was already removed from
155 /// `in_flight_urls` by `force_cancel`). This set exists so that
156 /// `poll` can distinguish ghost responses from real completions.
157 cancelled_in_flight: Mutex<HashSet<String>>,
158
159 /// Monotonic insertion counter used to preserve FIFO ordering among
160 /// equal-priority requests.
161 sequence: AtomicU64,
162}
163
164impl FetchPool {
165 /// Create a new pool wrapping `client` with at most `max_concurrent`
166 /// simultaneous downloads.
167 ///
168 /// A `max_concurrent` of 0 is silently clamped to 1.
169 pub fn new(client: Box<dyn HttpClient>, max_concurrent: usize) -> Self {
170 Self {
171 client,
172 max_concurrent: max_concurrent.max(1),
173 queue: Mutex::new(BinaryHeap::new()),
174 known_urls: Mutex::new(HashSet::new()),
175 in_flight_urls: Mutex::new(HashSet::new()),
176 cancelled_in_flight: Mutex::new(HashSet::new()),
177 sequence: AtomicU64::new(0),
178 }
179 }
180
181 /// Add a URL to the priority queue.
182 ///
183 /// Lower `priority` = dispatched sooner. Duplicate URLs (already
184 /// queued or in-flight) are silently ignored.
185 ///
186 /// The request is **not** sent until [`flush`](Self::flush) is called
187 /// (or implicitly via [`poll`](Self::poll)). This allows the caller
188 /// to batch-enqueue a frame's worth of tiles before dispatching, so
189 /// the priority heap can sort them correctly.
190 pub fn enqueue(&self, request: HttpRequest, priority: f64) {
191 let url = request.url.clone();
192 let mut known = match self.known_urls.lock() {
193 Ok(u) => u,
194 Err(_) => return,
195 };
196 if !known.insert(url.clone()) {
197 return;
198 }
199 drop(known);
200
201 // If this URL was previously force-cancelled while in-flight but is
202 // now being re-enqueued, the eventual response should satisfy the new
203 // logical request rather than being discarded as a ghost. Clear the
204 // ghost marker here so `poll` treats the next completion as real.
205 if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
206 cancelled.remove(&url);
207 }
208
209 if let Ok(mut queue) = self.queue.lock() {
210 queue.push(PrioritizedRequest {
211 request,
212 priority,
213 sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed),
214 });
215 }
216 }
217
218 /// Dispatch queued requests up to the concurrency limit.
219 ///
220 /// The highest-priority (lowest score) requests are sent first.
221 /// Call this once per frame after all [`enqueue`](Self::enqueue)
222 /// calls for that frame.
223 pub fn flush(&self) {
224 let mut in_flight = match self.in_flight_urls.lock() {
225 Ok(f) => f,
226 Err(_) => return,
227 };
228 let mut queue = match self.queue.lock() {
229 Ok(q) => q,
230 Err(_) => return,
231 };
232
233 while in_flight.len() < self.max_concurrent {
234 match queue.pop() {
235 Some(req) => {
236 let url = req.request.url.clone();
237 self.client.send(req.request);
238 in_flight.insert(url);
239 }
240 None => break,
241 }
242 }
243 }
244
245 /// Remove a URL from the pending queue if it has not been sent yet.
246 ///
247 /// Returns `true` if the URL was found and removed. Already in-flight
248 /// requests are not affected.
249 pub fn cancel(&self, url: &str) -> bool {
250 let in_queue = self
251 .queue
252 .lock()
253 .is_ok_and(|q| q.iter().any(|r| r.request.url == url));
254 if !in_queue {
255 return false;
256 }
257
258 // Remove from the dedup set.
259 if let Ok(mut known) = self.known_urls.lock() {
260 known.remove(url);
261 }
262
263 // Rebuild the heap without the cancelled URL. O(n) but
264 // cancellation is infrequent.
265 if let Ok(mut queue) = self.queue.lock() {
266 let old: Vec<_> = queue.drain().collect();
267 for req in old {
268 if req.request.url != url {
269 queue.push(req);
270 }
271 }
272 }
273 true
274 }
275
276 /// Remove a URL from the dedup set regardless of whether it is
277 /// queued or in-flight.
278 ///
279 /// If the URL is still in the queue it is also removed from the
280 /// heap. If it is already in-flight, the concurrency slot is
281 /// reclaimed immediately so that new requests can be dispatched
282 /// without waiting for the ghost HTTP response to arrive. The
283 /// URL is recorded in an internal set so that [`poll`](Self::poll)
284 /// does not double-decrement `in_flight` when the ghost response
285 /// eventually completes.
286 ///
287 /// This allows the same URL to be re-enqueued immediately, which
288 /// is essential when the tile manager cancels a pending tile and
289 /// then re-requests it on a subsequent frame.
290 pub fn force_cancel(&self, url: &str) {
291 // Check whether the URL is still in the queue (not yet sent).
292 let was_queued = self
293 .queue
294 .lock()
295 .is_ok_and(|q| q.iter().any(|r| r.request.url == url));
296
297 // Always remove from the dedup set so re-enqueue is possible.
298 if let Ok(mut known) = self.known_urls.lock() {
299 known.remove(url);
300 }
301
302 if was_queued {
303 // Remove from the heap -- the request was never sent.
304 if let Ok(mut queue) = self.queue.lock() {
305 let old: Vec<_> = queue.drain().collect();
306 for req in old {
307 if req.request.url != url {
308 queue.push(req);
309 }
310 }
311 }
312 } else {
313 // The URL is in-flight. Immediately reclaim the concurrency
314 // slot so new requests can be dispatched without waiting for
315 // the ghost HTTP response to arrive.
316 if let Ok(mut in_flight) = self.in_flight_urls.lock() {
317 in_flight.remove(url);
318 }
319 // Record the ghost so `poll` knows the eventual response is
320 // orphaned and should not be forwarded to the caller.
321 if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
322 cancelled.insert(url.to_owned());
323 }
324 }
325 }
326
327 /// Discard all queued (not yet sent) requests.
328 ///
329 /// In-flight requests are unaffected -- they will still appear in
330 /// future [`poll`](Self::poll) results.
331 pub fn clear_queue(&self) {
332 if let Ok(mut queue) = self.queue.lock() {
333 // Remove queued URLs from the known set so they can be
334 // re-enqueued later if needed.
335 if let Ok(mut known) = self.known_urls.lock() {
336 for req in queue.iter() {
337 known.remove(&req.request.url);
338 }
339 }
340 queue.clear();
341 }
342 }
343
344 /// Number of requests waiting in the queue (not yet sent).
345 pub fn queued_count(&self) -> usize {
346 self.queue.lock().map(|q| q.len()).unwrap_or(0)
347 }
348
349 /// Maximum number of concurrent in-flight requests.
350 #[inline]
351 pub fn max_concurrent(&self) -> usize {
352 self.max_concurrent
353 }
354
355 /// Number of requests currently in-flight (sent, awaiting response).
356 pub fn in_flight_count(&self) -> usize {
357 self.in_flight_urls.lock().map(|g| g.len()).unwrap_or(0)
358 }
359
360 /// Number of URLs currently tracked by the dedup set.
361 pub fn known_count(&self) -> usize {
362 self.known_urls.lock().map(|k| k.len()).unwrap_or(0)
363 }
364
365 /// Number of URLs recorded as force-cancelled while in-flight.
366 pub fn cancelled_in_flight_count(&self) -> usize {
367 self.cancelled_in_flight
368 .lock()
369 .map(|set| set.len())
370 .unwrap_or(0)
371 }
372
373 /// Returns `true` if `url` is known to the pool (queued or in-flight).
374 pub fn is_known(&self, url: &str) -> bool {
375 self.known_urls.lock().is_ok_and(|k| k.contains(url))
376 }
377
378 /// Poll the underlying [`HttpClient`] for completed responses and
379 /// release their concurrency slots.
380 ///
381 /// Any freed slots are immediately filled from the priority queue
382 /// (implicit flush).
383 ///
384 /// Returns `(url, Result<HttpResponse, String>)` for each completed
385 /// request.
386 pub fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
387 let results = self.client.poll();
388
389 if !results.is_empty() {
390 // Clean up ghost responses (force-cancelled while in-flight)
391 // and release concurrency slots for real completions.
392 let mut cancelled = self.cancelled_in_flight.lock().ok();
393 let mut in_flight = self.in_flight_urls.lock().ok();
394
395 for (url, _) in &results {
396 // If this URL was force-cancelled, it is a ghost -- the
397 // slot was already freed in force_cancel. Just clean up
398 // the tracking set.
399 if let Some(ref mut set) = cancelled {
400 if set.remove(url.as_str()) {
401 continue;
402 }
403 }
404 // Real completion: free the concurrency slot.
405 if let Some(ref mut urls) = in_flight {
406 urls.remove(url.as_str());
407 }
408 }
409 drop(cancelled);
410 drop(in_flight);
411
412 // Remove completed URLs from the dedup set.
413 if let Ok(mut known) = self.known_urls.lock() {
414 for (url, _) in &results {
415 known.remove(url);
416 }
417 }
418 }
419
420 // Fill any freed slots from the queue.
421 self.flush();
422
423 results
424 }
425}
426
427// ---------------------------------------------------------------------------
428// Tests
429// ---------------------------------------------------------------------------
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use crate::io::shared_http_client::SharedHttpClient;
435 use std::sync::Arc;
436
437 // -- Helpers ----------------------------------------------------------
438
439 /// Mock that instantly completes every request on the next `poll`.
440 struct InstantMockClient {
441 sent: Mutex<Vec<String>>,
442 }
443
444 impl InstantMockClient {
445 fn new() -> Self {
446 Self {
447 sent: Mutex::new(Vec::new()),
448 }
449 }
450 }
451
452 impl HttpClient for InstantMockClient {
453 fn send(&self, request: HttpRequest) {
454 self.sent.lock().unwrap().push(request.url);
455 }
456
457 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
458 let sent = std::mem::take(&mut *self.sent.lock().unwrap());
459 sent.into_iter()
460 .map(|url| {
461 (
462 url,
463 Ok(HttpResponse {
464 status: 200,
465 body: Vec::new(),
466 headers: Vec::new(),
467 }),
468 )
469 })
470 .collect()
471 }
472 }
473
474 /// Mock that records send order but never completes.
475 struct DeferredMockClient {
476 sent: Mutex<Vec<String>>,
477 }
478
479 impl DeferredMockClient {
480 fn new() -> Self {
481 Self {
482 sent: Mutex::new(Vec::new()),
483 }
484 }
485 }
486
487 impl HttpClient for DeferredMockClient {
488 fn send(&self, request: HttpRequest) {
489 self.sent.lock().unwrap().push(request.url);
490 }
491
492 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
493 Vec::new()
494 }
495 }
496
497 /// Shared mock for inspecting send order after handing ownership
498 /// to FetchPool.
499 struct SharedMock(Arc<Mutex<Vec<String>>>);
500
501 impl HttpClient for SharedMock {
502 fn send(&self, request: HttpRequest) {
503 self.0.lock().unwrap().push(request.url);
504 }
505 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
506 Vec::new()
507 }
508 }
509
510 #[derive(Clone, Default)]
511 #[allow(clippy::type_complexity)]
512 struct RecordingClient {
513 sent: Arc<Mutex<Vec<String>>>,
514 responses: Arc<Mutex<Vec<(String, Result<HttpResponse, String>)>>>,
515 }
516
517 impl RecordingClient {
518 fn sent_urls(&self) -> Vec<String> {
519 self.sent.lock().unwrap().clone()
520 }
521
522 fn complete(&self, url: &str) {
523 self.responses.lock().unwrap().push((
524 url.to_owned(),
525 Ok(HttpResponse {
526 status: 200,
527 body: Vec::new(),
528 headers: Vec::new(),
529 }),
530 ));
531 }
532 }
533
534 impl HttpClient for RecordingClient {
535 fn send(&self, request: HttpRequest) {
536 self.sent.lock().unwrap().push(request.url);
537 }
538
539 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
540 std::mem::take(&mut *self.responses.lock().unwrap())
541 }
542 }
543
544 // -- Concurrency limit ------------------------------------------------
545
546 #[test]
547 fn respects_concurrency_limit() {
548 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
549 pool.enqueue(HttpRequest::get("a"), 1.0);
550 pool.enqueue(HttpRequest::get("b"), 2.0);
551 pool.enqueue(HttpRequest::get("c"), 3.0);
552 pool.flush();
553
554 // Only 2 sent, 1 remains queued.
555 assert_eq!(pool.in_flight_count(), 2);
556 assert_eq!(pool.queued_count(), 1);
557 }
558
559 #[test]
560 fn freed_slots_dispatch_queued() {
561 let pool = FetchPool::new(Box::new(InstantMockClient::new()), 1);
562 pool.enqueue(HttpRequest::get("a"), 1.0);
563 pool.enqueue(HttpRequest::get("b"), 2.0);
564 pool.flush();
565
566 // 'a' sent (lower priority value), 'b' queued.
567 assert_eq!(pool.in_flight_count(), 1);
568 assert_eq!(pool.queued_count(), 1);
569
570 // Poll completes 'a', implicit flush sends 'b'.
571 let results = pool.poll();
572 assert_eq!(results.len(), 1);
573 assert_eq!(results[0].0, "a");
574 assert_eq!(pool.queued_count(), 0);
575 assert_eq!(pool.in_flight_count(), 1); // 'b' now in-flight
576 }
577
578 // -- Priority ordering ------------------------------------------------
579
580 #[test]
581 fn priority_order_nearest_first() {
582 let sent = Arc::new(Mutex::new(Vec::new()));
583 let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
584
585 // Enqueue in non-priority order.
586 pool.enqueue(HttpRequest::get("far"), 100.0);
587 pool.enqueue(HttpRequest::get("near"), 1.0);
588 pool.enqueue(HttpRequest::get("mid"), 50.0);
589
590 // Flush dispatches all three; heap sorts by priority.
591 pool.flush();
592
593 let order = sent.lock().unwrap().clone();
594 assert_eq!(order.len(), 3);
595 assert_eq!(order[0], "near");
596 assert_eq!(order[1], "mid");
597 assert_eq!(order[2], "far");
598 }
599
600 #[test]
601 fn equal_priority_preserves_enqueue_order() {
602 let sent = Arc::new(Mutex::new(Vec::new()));
603 let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
604
605 pool.enqueue(HttpRequest::get("a"), 1.0);
606 pool.enqueue(HttpRequest::get("b"), 1.0);
607 pool.enqueue(HttpRequest::get("c"), 1.0);
608 pool.flush();
609
610 let order = sent.lock().unwrap().clone();
611 assert_eq!(order, vec!["a", "b", "c"]);
612 }
613
614 // -- Duplicate suppression --------------------------------------------
615
616 #[test]
617 fn duplicate_enqueue_ignored() {
618 let sent = Arc::new(Mutex::new(Vec::new()));
619 let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
620 pool.enqueue(HttpRequest::get("a"), 1.0);
621 pool.enqueue(HttpRequest::get("a"), 2.0); // duplicate -- silently dropped
622 pool.flush();
623
624 assert_eq!(pool.in_flight_count(), 1);
625 assert_eq!(sent.lock().unwrap().len(), 1);
626 }
627
628 #[test]
629 fn duplicate_suppressed_while_in_flight() {
630 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
631 pool.enqueue(HttpRequest::get("a"), 1.0);
632 pool.flush(); // 'a' is now in-flight
633
634 // Try to enqueue 'a' again while it is in-flight.
635 pool.enqueue(HttpRequest::get("a"), 2.0);
636 assert_eq!(pool.queued_count(), 0, "should not re-queue in-flight URL");
637 }
638
639 #[test]
640 fn can_re_enqueue_after_completion() {
641 let pool = FetchPool::new(Box::new(InstantMockClient::new()), 10);
642 pool.enqueue(HttpRequest::get("a"), 1.0);
643 pool.flush();
644
645 // Complete it.
646 let results = pool.poll();
647 assert_eq!(results.len(), 1);
648
649 // Now re-enqueue should work.
650 pool.enqueue(HttpRequest::get("a"), 1.0);
651 pool.flush();
652 assert_eq!(pool.in_flight_count(), 1);
653 }
654
655 // -- Cancel -----------------------------------------------------------
656
657 #[test]
658 fn cancel_removes_queued_request() {
659 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
660 pool.enqueue(HttpRequest::get("a"), 1.0);
661 pool.enqueue(HttpRequest::get("b"), 2.0);
662 pool.enqueue(HttpRequest::get("c"), 3.0);
663 pool.flush();
664
665 // 'a' sent (priority 1.0), 'b' and 'c' queued.
666 assert_eq!(pool.queued_count(), 2);
667 assert!(pool.cancel("b"));
668 assert_eq!(pool.queued_count(), 1);
669 }
670
671 #[test]
672 fn cancel_nonexistent_returns_false() {
673 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
674 assert!(!pool.cancel("nope"));
675 }
676
677 #[test]
678 fn cancel_in_flight_returns_false() {
679 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
680 pool.enqueue(HttpRequest::get("a"), 1.0);
681 pool.flush();
682 // 'a' is in-flight, not queued.
683 assert!(!pool.cancel("a"));
684 }
685
686 // -- Clear queue ------------------------------------------------------
687
688 #[test]
689 fn clear_queue_discards_pending() {
690 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
691 pool.enqueue(HttpRequest::get("a"), 1.0);
692 pool.enqueue(HttpRequest::get("b"), 2.0);
693 pool.enqueue(HttpRequest::get("c"), 3.0);
694 pool.flush();
695
696 assert_eq!(pool.queued_count(), 2);
697 pool.clear_queue();
698 assert_eq!(pool.queued_count(), 0);
699 // In-flight request is unaffected.
700 assert_eq!(pool.in_flight_count(), 1);
701 }
702
703 #[test]
704 fn cleared_urls_can_be_re_enqueued() {
705 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
706 pool.enqueue(HttpRequest::get("a"), 1.0);
707 pool.enqueue(HttpRequest::get("b"), 2.0);
708 pool.flush(); // sends 'a', 'b' queued
709 pool.clear_queue(); // drops 'b' from queue and known set
710
711 pool.enqueue(HttpRequest::get("b"), 2.0); // should succeed
712 assert_eq!(pool.queued_count(), 1);
713 }
714
715 // -- Zero-concurrency edge case ---------------------------------------
716
717 #[test]
718 fn zero_concurrency_clamped_to_one() {
719 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 0);
720 pool.enqueue(HttpRequest::get("a"), 1.0);
721 pool.flush();
722 assert_eq!(pool.in_flight_count(), 1);
723 }
724
725 // -- Empty pool -------------------------------------------------------
726
727 #[test]
728 fn poll_empty_returns_empty() {
729 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
730 assert!(pool.poll().is_empty());
731 assert_eq!(pool.in_flight_count(), 0);
732 assert_eq!(pool.queued_count(), 0);
733 }
734
735 #[test]
736 fn flush_empty_is_noop() {
737 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
738 pool.flush(); // should not panic
739 assert_eq!(pool.in_flight_count(), 0);
740 }
741
742 // -- Force-cancel ghost slot reclamation ------------------------------
743
744 #[test]
745 fn force_cancel_immediately_reclaims_in_flight_slot() {
746 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
747
748 pool.enqueue(HttpRequest::get("a"), 1.0);
749 pool.enqueue(HttpRequest::get("b"), 2.0);
750 pool.enqueue(HttpRequest::get("c"), 3.0);
751 pool.flush();
752
753 // 'a' and 'b' in-flight, 'c' queued.
754 assert_eq!(pool.in_flight_count(), 2);
755 assert_eq!(pool.queued_count(), 1);
756
757 // Force-cancel 'a' while it is in-flight.
758 pool.force_cancel("a");
759
760 // The slot should be reclaimed immediately.
761 assert_eq!(
762 pool.in_flight_count(),
763 1,
764 "ghost slot must be reclaimed immediately"
765 );
766 assert_eq!(pool.cancelled_in_flight_count(), 1);
767
768 // Flush should now dispatch 'c' into the freed slot.
769 pool.flush();
770 assert_eq!(
771 pool.in_flight_count(),
772 2,
773 "freed slot should accept queued request"
774 );
775 assert_eq!(pool.queued_count(), 0);
776 }
777
778 #[test]
779 fn ghost_response_does_not_double_decrement_in_flight() {
780 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
781
782 pool.enqueue(HttpRequest::get("a"), 1.0);
783 pool.enqueue(HttpRequest::get("b"), 2.0);
784 pool.flush();
785 assert_eq!(pool.in_flight_count(), 2);
786
787 // Force-cancel 'a': slot reclaimed immediately.
788 pool.force_cancel("a");
789 assert_eq!(pool.in_flight_count(), 1);
790 assert_eq!(pool.cancelled_in_flight_count(), 1);
791
792 // poll returns nothing (DeferredMockClient), so ghost persists.
793 let _ = pool.poll();
794 assert_eq!(pool.in_flight_count(), 1);
795 assert_eq!(
796 pool.cancelled_in_flight_count(),
797 1,
798 "ghost persists until response arrives"
799 );
800 }
801
802 #[test]
803 fn force_cancel_allows_re_enqueue_of_in_flight_url() {
804 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
805 pool.enqueue(HttpRequest::get("a"), 1.0);
806 pool.flush();
807 assert_eq!(pool.in_flight_count(), 1);
808
809 pool.force_cancel("a");
810 assert_eq!(pool.in_flight_count(), 0);
811
812 // Should be able to re-enqueue immediately.
813 pool.enqueue(HttpRequest::get("a"), 1.0);
814 pool.flush();
815 assert_eq!(pool.in_flight_count(), 1);
816 }
817
818 #[test]
819 fn reenqueue_after_force_cancel_with_shared_dedup_completes_cleanly() {
820 let inner = RecordingClient::default();
821 let shared = SharedHttpClient::new(Box::new(inner.clone()));
822 let pool = FetchPool::new(Box::new(shared), 2);
823
824 pool.enqueue(HttpRequest::get("a"), 1.0);
825 pool.enqueue(HttpRequest::get("b"), 2.0);
826 pool.flush();
827 assert_eq!(pool.in_flight_count(), 2);
828 assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
829
830 // Cancel the in-flight request and immediately re-enqueue the same URL.
831 pool.force_cancel("a");
832 assert_eq!(pool.in_flight_count(), 1);
833 assert_eq!(pool.cancelled_in_flight_count(), 1);
834
835 pool.enqueue(HttpRequest::get("a"), 0.5);
836 pool.flush();
837
838 // SharedHttpClient should dedup the resend against the original
839 // in-flight request, so no second network send for "a" occurs.
840 assert_eq!(pool.in_flight_count(), 2);
841 assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
842 assert_eq!(
843 pool.cancelled_in_flight_count(),
844 0,
845 "re-enqueue should clear ghost marker"
846 );
847
848 // When the original response for "a" arrives, it must satisfy the new
849 // logical request rather than being discarded as a ghost.
850 inner.complete("a");
851 let results = pool.poll();
852 assert_eq!(results.len(), 1);
853 assert_eq!(results[0].0, "a");
854 assert_eq!(
855 pool.in_flight_count(),
856 1,
857 "completion should retire the re-enqueued URL"
858 );
859 }
860}