rustial_engine/io/fetch_pool.rs
1//! Concurrent fetch pool with configurable concurrency and viewport-center
2//! priority ordering.
3//!
4//! # Purpose
5//!
6//! [`FetchPool`] sits between the engine (which decides *which* tiles to
7//! fetch) and the host-provided [`HttpClient`] (which does the actual I/O).
8//! It adds two things the raw client does not have:
9//!
10//! 1. **Concurrency limit** -- prevents flooding the network with hundreds
11//! of requests at once when the user zooms out.
12//! 2. **Priority queue** -- tiles closest to the viewport center are sent
13//! first, so the area the user is looking at loads before periphery.
14//!
15//! # Request lifecycle
16//!
17//! ```text
18//! enqueue(url, priority) -- adds to priority queue
19//! |
20//! v
21//! +----------+ flush() +------------+
22//! | queue | ---------> | HttpClient |
23//! | (heap) | | (in-flight) |
24//! +----------+ +------+------
25//! | poll()
26//! v
27//! completed results
28//! ```
29//!
30//! Call [`enqueue`](FetchPool::enqueue) to add requests, then
31//! [`flush`](FetchPool::flush) to dispatch up to the concurrency limit.
32//! [`poll`](FetchPool::poll) collects completed responses and
33//! automatically flushes freed slots.
34//!
35//! # Thread safety
36//!
37//! The pool is `Send + Sync`. Internal state is protected by mutexes so
38//! it can be called from any thread. Lock scopes are kept short to
39//! avoid contention.
40//!
41//! # Cancellation
42//!
43//! Call [`cancel`](FetchPool::cancel) to remove a queued (not yet sent)
44//! request. Already in-flight requests cannot be cancelled at this level
45//! -- the `HttpClient` may or may not support that.
46
47use crate::io::{HttpClient, HttpRequest, HttpResponse};
48use std::cmp::Ordering;
49use std::collections::{BinaryHeap, HashSet};
50use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
51use std::sync::Mutex;
52
53// ---------------------------------------------------------------------------
54// Priority wrapper
55// ---------------------------------------------------------------------------
56
57/// A fetch request annotated with a priority score.
58///
59/// Lower `priority` values are dispatched first (nearest to viewport center).
60#[derive(Debug)]
61struct PrioritizedRequest {
62 request: HttpRequest,
63 /// Distance-like metric from the viewport center.
64 /// Lower = more important = dispatched sooner.
65 priority: f64,
66 sequence: u64,
67}
68
69impl PartialEq for PrioritizedRequest {
70 fn eq(&self, other: &Self) -> bool {
71 self.request.url == other.request.url
72 }
73}
74impl Eq for PrioritizedRequest {}
75
76impl PartialOrd for PrioritizedRequest {
77 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
78 Some(self.cmp(other))
79 }
80}
81
82impl Ord for PrioritizedRequest {
83 fn cmp(&self, other: &Self) -> Ordering {
84 // BinaryHeap is a max-heap. We want lower priority values at the
85 // top, so we reverse the comparison. NaN is treated as worst
86 // priority (pushed to the bottom).
87 other
88 .priority
89 .partial_cmp(&self.priority)
90 .unwrap_or(Ordering::Equal)
91 .then_with(|| other.sequence.cmp(&self.sequence))
92 }
93}
94
95// ---------------------------------------------------------------------------
96// FetchPool
97// ---------------------------------------------------------------------------
98
99/// A concurrency-limited, priority-ordered download scheduler.
100///
101/// Wraps an [`HttpClient`] and adds a bounded queue that dispatches
102/// the most important requests first.
103///
104/// # Example
105///
106/// ```rust,ignore
107/// use rustial_engine::{FetchPool, HttpClient};
108///
109/// let pool = FetchPool::new(my_http_client, 6);
110///
111/// // Batch enqueue -- no requests are sent yet.
112/// pool.enqueue("https://tile.example.com/10/512/340.png".into(), 0.0);
113/// pool.enqueue("https://tile.example.com/10/513/340.png".into(), 1.0);
114///
115/// // Dispatch the highest-priority requests up to the limit.
116/// pool.flush();
117///
118/// // Later, in the frame loop:
119/// let completed = pool.poll(); // also flushes freed slots
120/// for (url, result) in completed {
121/// // handle response
122/// }
123/// ```
124pub struct FetchPool {
125 /// The host-provided HTTP transport.
126 client: Box<dyn HttpClient>,
127
128 /// Maximum number of requests that may be in-flight simultaneously.
129 max_concurrent: usize,
130
131 /// Requests waiting to be dispatched, ordered by priority.
132 queue: Mutex<BinaryHeap<PrioritizedRequest>>,
133
134 /// URLs that are currently queued *or* in-flight (dedup set).
135 /// An entry is inserted on `enqueue` and removed when `poll`
136 /// returns the completed response, preventing duplicate requests
137 /// for the same URL.
138 known_urls: Mutex<HashSet<String>>,
139
140 /// URLs currently in-flight (sent to the HTTP client, awaiting response).
141 ///
142 /// Using a set of URLs instead of a plain counter eliminates
143 /// counting mismatches that arise when a deduplicating HTTP client
144 /// wrapper (e.g. [`SharedHttpClient`](crate::SharedHttpClient))
145 /// silently coalesces a re-sent URL with an existing in-flight
146 /// request. With a set, `flush` inserts the URL (idempotent if
147 /// already present), `poll` removes it, and `force_cancel` removes
148 /// it immediately -- the set length is always accurate.
149 in_flight_urls: Mutex<HashSet<String>>,
150
151 /// URLs that were force-cancelled while already in-flight.
152 ///
153 /// When the HTTP response for such a URL arrives in [`poll`], it
154 /// is silently discarded (the URL was already removed from
155 /// `in_flight_urls` by `force_cancel`). This set exists so that
156 /// `poll` can distinguish ghost responses from real completions.
157 cancelled_in_flight: Mutex<HashSet<String>>,
158
159 /// Monotonic insertion counter used to preserve FIFO ordering among
160 /// equal-priority requests.
161 sequence: AtomicU64,
162}
163
164impl FetchPool {
165 /// Create a new pool wrapping `client` with at most `max_concurrent`
166 /// simultaneous downloads.
167 ///
168 /// A `max_concurrent` of 0 is silently clamped to 1.
169 pub fn new(client: Box<dyn HttpClient>, max_concurrent: usize) -> Self {
170 Self {
171 client,
172 max_concurrent: max_concurrent.max(1),
173 queue: Mutex::new(BinaryHeap::new()),
174 known_urls: Mutex::new(HashSet::new()),
175 in_flight_urls: Mutex::new(HashSet::new()),
176 cancelled_in_flight: Mutex::new(HashSet::new()),
177 sequence: AtomicU64::new(0),
178 }
179 }
180
181 /// Add a URL to the priority queue.
182 ///
183 /// Lower `priority` = dispatched sooner. Duplicate URLs (already
184 /// queued or in-flight) are silently ignored.
185 ///
186 /// The request is **not** sent until [`flush`](Self::flush) is called
187 /// (or implicitly via [`poll`](Self::poll)). This allows the caller
188 /// to batch-enqueue a frame's worth of tiles before dispatching, so
189 /// the priority heap can sort them correctly.
190 pub fn enqueue(&self, request: HttpRequest, priority: f64) {
191 let url = request.url.clone();
192 let mut known = match self.known_urls.lock() {
193 Ok(u) => u,
194 Err(_) => return,
195 };
196 if !known.insert(url.clone()) {
197 return;
198 }
199 drop(known);
200
201 // If this URL was previously force-cancelled while in-flight but is
202 // now being re-enqueued, the eventual response should satisfy the new
203 // logical request rather than being discarded as a ghost. Clear the
204 // ghost marker here so `poll` treats the next completion as real.
205 if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
206 cancelled.remove(&url);
207 }
208
209 if let Ok(mut queue) = self.queue.lock() {
210 queue.push(PrioritizedRequest {
211 request,
212 priority,
213 sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed),
214 });
215 }
216 }
217
218 /// Dispatch queued requests up to the concurrency limit.
219 ///
220 /// The highest-priority (lowest score) requests are sent first.
221 /// Call this once per frame after all [`enqueue`](Self::enqueue)
222 /// calls for that frame.
223 pub fn flush(&self) {
224 let mut in_flight = match self.in_flight_urls.lock() {
225 Ok(f) => f,
226 Err(_) => return,
227 };
228 let mut queue = match self.queue.lock() {
229 Ok(q) => q,
230 Err(_) => return,
231 };
232
233 while in_flight.len() < self.max_concurrent {
234 match queue.pop() {
235 Some(req) => {
236 let url = req.request.url.clone();
237 self.client.send(req.request);
238 in_flight.insert(url);
239 }
240 None => break,
241 }
242 }
243 }
244
245 /// Remove a URL from the pending queue if it has not been sent yet.
246 ///
247 /// Returns `true` if the URL was found and removed. Already in-flight
248 /// requests are not affected.
249 pub fn cancel(&self, url: &str) -> bool {
250 let in_queue = self.queue.lock().is_ok_and(|q| {
251 q.iter().any(|r| r.request.url == url)
252 });
253 if !in_queue {
254 return false;
255 }
256
257 // Remove from the dedup set.
258 if let Ok(mut known) = self.known_urls.lock() {
259 known.remove(url);
260 }
261
262 // Rebuild the heap without the cancelled URL. O(n) but
263 // cancellation is infrequent.
264 if let Ok(mut queue) = self.queue.lock() {
265 let old: Vec<_> = queue.drain().collect();
266 for req in old {
267 if req.request.url != url {
268 queue.push(req);
269 }
270 }
271 }
272 true
273 }
274
275 /// Remove a URL from the dedup set regardless of whether it is
276 /// queued or in-flight.
277 ///
278 /// If the URL is still in the queue it is also removed from the
279 /// heap. If it is already in-flight, the concurrency slot is
280 /// reclaimed immediately so that new requests can be dispatched
281 /// without waiting for the ghost HTTP response to arrive. The
282 /// URL is recorded in an internal set so that [`poll`](Self::poll)
283 /// does not double-decrement `in_flight` when the ghost response
284 /// eventually completes.
285 ///
286 /// This allows the same URL to be re-enqueued immediately, which
287 /// is essential when the tile manager cancels a pending tile and
288 /// then re-requests it on a subsequent frame.
289 pub fn force_cancel(&self, url: &str) {
290 // Check whether the URL is still in the queue (not yet sent).
291 let was_queued = self.queue.lock().is_ok_and(|q| {
292 q.iter().any(|r| r.request.url == url)
293 });
294
295 // Always remove from the dedup set so re-enqueue is possible.
296 if let Ok(mut known) = self.known_urls.lock() {
297 known.remove(url);
298 }
299
300 if was_queued {
301 // Remove from the heap -- the request was never sent.
302 if let Ok(mut queue) = self.queue.lock() {
303 let old: Vec<_> = queue.drain().collect();
304 for req in old {
305 if req.request.url != url {
306 queue.push(req);
307 }
308 }
309 }
310 } else {
311 // The URL is in-flight. Immediately reclaim the concurrency
312 // slot so new requests can be dispatched without waiting for
313 // the ghost HTTP response to arrive.
314 if let Ok(mut in_flight) = self.in_flight_urls.lock() {
315 in_flight.remove(url);
316 }
317 // Record the ghost so `poll` knows the eventual response is
318 // orphaned and should not be forwarded to the caller.
319 if let Ok(mut cancelled) = self.cancelled_in_flight.lock() {
320 cancelled.insert(url.to_owned());
321 }
322 }
323 }
324
325 /// Discard all queued (not yet sent) requests.
326 ///
327 /// In-flight requests are unaffected -- they will still appear in
328 /// future [`poll`](Self::poll) results.
329 pub fn clear_queue(&self) {
330 if let Ok(mut queue) = self.queue.lock() {
331 // Remove queued URLs from the known set so they can be
332 // re-enqueued later if needed.
333 if let Ok(mut known) = self.known_urls.lock() {
334 for req in queue.iter() {
335 known.remove(&req.request.url);
336 }
337 }
338 queue.clear();
339 }
340 }
341
342 /// Number of requests waiting in the queue (not yet sent).
343 pub fn queued_count(&self) -> usize {
344 self.queue.lock().map(|q| q.len()).unwrap_or(0)
345 }
346
347 /// Maximum number of concurrent in-flight requests.
348 #[inline]
349 pub fn max_concurrent(&self) -> usize {
350 self.max_concurrent
351 }
352
353 /// Number of requests currently in-flight (sent, awaiting response).
354 pub fn in_flight_count(&self) -> usize {
355 self.in_flight_urls.lock().map(|g| g.len()).unwrap_or(0)
356 }
357
358 /// Number of URLs currently tracked by the dedup set.
359 pub fn known_count(&self) -> usize {
360 self.known_urls.lock().map(|k| k.len()).unwrap_or(0)
361 }
362
363 /// Number of URLs recorded as force-cancelled while in-flight.
364 pub fn cancelled_in_flight_count(&self) -> usize {
365 self.cancelled_in_flight
366 .lock()
367 .map(|set| set.len())
368 .unwrap_or(0)
369 }
370
371 /// Returns `true` if `url` is known to the pool (queued or in-flight).
372 pub fn is_known(&self, url: &str) -> bool {
373 self.known_urls.lock().is_ok_and(|k| k.contains(url))
374 }
375
376 /// Poll the underlying [`HttpClient`] for completed responses and
377 /// release their concurrency slots.
378 ///
379 /// Any freed slots are immediately filled from the priority queue
380 /// (implicit flush).
381 ///
382 /// Returns `(url, Result<HttpResponse, String>)` for each completed
383 /// request.
384 pub fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
385 let results = self.client.poll();
386
387 if !results.is_empty() {
388 // Clean up ghost responses (force-cancelled while in-flight)
389 // and release concurrency slots for real completions.
390 let mut cancelled = self.cancelled_in_flight.lock().ok();
391 let mut in_flight = self.in_flight_urls.lock().ok();
392
393 for (url, _) in &results {
394 // If this URL was force-cancelled, it is a ghost -- the
395 // slot was already freed in force_cancel. Just clean up
396 // the tracking set.
397 if let Some(ref mut set) = cancelled {
398 if set.remove(url.as_str()) {
399 continue;
400 }
401 }
402 // Real completion: free the concurrency slot.
403 if let Some(ref mut urls) = in_flight {
404 urls.remove(url.as_str());
405 }
406 }
407 drop(cancelled);
408 drop(in_flight);
409
410 // Remove completed URLs from the dedup set.
411 if let Ok(mut known) = self.known_urls.lock() {
412 for (url, _) in &results {
413 known.remove(url);
414 }
415 }
416 }
417
418 // Fill any freed slots from the queue.
419 self.flush();
420
421 results
422 }
423}
424
425// ---------------------------------------------------------------------------
426// Tests
427// ---------------------------------------------------------------------------
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use crate::io::shared_http_client::SharedHttpClient;
433 use std::sync::Arc;
434
435 // -- Helpers ----------------------------------------------------------
436
437 /// Mock that instantly completes every request on the next `poll`.
438 struct InstantMockClient {
439 sent: Mutex<Vec<String>>,
440 }
441
442 impl InstantMockClient {
443 fn new() -> Self {
444 Self {
445 sent: Mutex::new(Vec::new()),
446 }
447 }
448 }
449
450 impl HttpClient for InstantMockClient {
451 fn send(&self, request: HttpRequest) {
452 self.sent.lock().unwrap().push(request.url);
453 }
454
455 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
456 let sent = std::mem::take(&mut *self.sent.lock().unwrap());
457 sent.into_iter()
458 .map(|url| {
459 (
460 url,
461 Ok(HttpResponse {
462 status: 200,
463 body: Vec::new(),
464 headers: Vec::new(),
465 }),
466 )
467 })
468 .collect()
469 }
470 }
471
472 /// Mock that records send order but never completes.
473 struct DeferredMockClient {
474 sent: Mutex<Vec<String>>,
475 }
476
477 impl DeferredMockClient {
478 fn new() -> Self {
479 Self {
480 sent: Mutex::new(Vec::new()),
481 }
482 }
483 }
484
485 impl HttpClient for DeferredMockClient {
486 fn send(&self, request: HttpRequest) {
487 self.sent.lock().unwrap().push(request.url);
488 }
489
490 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
491 Vec::new()
492 }
493 }
494
495 /// Shared mock for inspecting send order after handing ownership
496 /// to FetchPool.
497 struct SharedMock(Arc<Mutex<Vec<String>>>);
498
499 impl HttpClient for SharedMock {
500 fn send(&self, request: HttpRequest) {
501 self.0.lock().unwrap().push(request.url);
502 }
503 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
504 Vec::new()
505 }
506 }
507
508 #[derive(Clone, Default)]
509 struct RecordingClient {
510 sent: Arc<Mutex<Vec<String>>>,
511 responses: Arc<Mutex<Vec<(String, Result<HttpResponse, String>)>>>,
512 }
513
514 impl RecordingClient {
515 fn sent_urls(&self) -> Vec<String> {
516 self.sent.lock().unwrap().clone()
517 }
518
519 fn complete(&self, url: &str) {
520 self.responses.lock().unwrap().push((
521 url.to_owned(),
522 Ok(HttpResponse {
523 status: 200,
524 body: Vec::new(),
525 headers: Vec::new(),
526 }),
527 ));
528 }
529 }
530
531 impl HttpClient for RecordingClient {
532 fn send(&self, request: HttpRequest) {
533 self.sent.lock().unwrap().push(request.url);
534 }
535
536 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
537 std::mem::take(&mut *self.responses.lock().unwrap())
538 }
539 }
540
541 // -- Concurrency limit ------------------------------------------------
542
543 #[test]
544 fn respects_concurrency_limit() {
545 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
546 pool.enqueue(HttpRequest::get("a"), 1.0);
547 pool.enqueue(HttpRequest::get("b"), 2.0);
548 pool.enqueue(HttpRequest::get("c"), 3.0);
549 pool.flush();
550
551 // Only 2 sent, 1 remains queued.
552 assert_eq!(pool.in_flight_count(), 2);
553 assert_eq!(pool.queued_count(), 1);
554 }
555
556 #[test]
557 fn freed_slots_dispatch_queued() {
558 let pool = FetchPool::new(Box::new(InstantMockClient::new()), 1);
559 pool.enqueue(HttpRequest::get("a"), 1.0);
560 pool.enqueue(HttpRequest::get("b"), 2.0);
561 pool.flush();
562
563 // 'a' sent (lower priority value), 'b' queued.
564 assert_eq!(pool.in_flight_count(), 1);
565 assert_eq!(pool.queued_count(), 1);
566
567 // Poll completes 'a', implicit flush sends 'b'.
568 let results = pool.poll();
569 assert_eq!(results.len(), 1);
570 assert_eq!(results[0].0, "a");
571 assert_eq!(pool.queued_count(), 0);
572 assert_eq!(pool.in_flight_count(), 1); // 'b' now in-flight
573 }
574
575 // -- Priority ordering ------------------------------------------------
576
577 #[test]
578 fn priority_order_nearest_first() {
579 let sent = Arc::new(Mutex::new(Vec::new()));
580 let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
581
582 // Enqueue in non-priority order.
583 pool.enqueue(HttpRequest::get("far"), 100.0);
584 pool.enqueue(HttpRequest::get("near"), 1.0);
585 pool.enqueue(HttpRequest::get("mid"), 50.0);
586
587 // Flush dispatches all three; heap sorts by priority.
588 pool.flush();
589
590 let order = sent.lock().unwrap().clone();
591 assert_eq!(order.len(), 3);
592 assert_eq!(order[0], "near");
593 assert_eq!(order[1], "mid");
594 assert_eq!(order[2], "far");
595 }
596
597 #[test]
598 fn equal_priority_preserves_enqueue_order() {
599 let sent = Arc::new(Mutex::new(Vec::new()));
600 let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
601
602 pool.enqueue(HttpRequest::get("a"), 1.0);
603 pool.enqueue(HttpRequest::get("b"), 1.0);
604 pool.enqueue(HttpRequest::get("c"), 1.0);
605 pool.flush();
606
607 let order = sent.lock().unwrap().clone();
608 assert_eq!(order, vec!["a", "b", "c"]);
609 }
610
611 // -- Duplicate suppression --------------------------------------------
612
613 #[test]
614 fn duplicate_enqueue_ignored() {
615 let sent = Arc::new(Mutex::new(Vec::new()));
616 let pool = FetchPool::new(Box::new(SharedMock(Arc::clone(&sent))), 10);
617 pool.enqueue(HttpRequest::get("a"), 1.0);
618 pool.enqueue(HttpRequest::get("a"), 2.0); // duplicate -- silently dropped
619 pool.flush();
620
621 assert_eq!(pool.in_flight_count(), 1);
622 assert_eq!(sent.lock().unwrap().len(), 1);
623 }
624
625 #[test]
626 fn duplicate_suppressed_while_in_flight() {
627 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
628 pool.enqueue(HttpRequest::get("a"), 1.0);
629 pool.flush(); // 'a' is now in-flight
630
631 // Try to enqueue 'a' again while it is in-flight.
632 pool.enqueue(HttpRequest::get("a"), 2.0);
633 assert_eq!(pool.queued_count(), 0, "should not re-queue in-flight URL");
634 }
635
636 #[test]
637 fn can_re_enqueue_after_completion() {
638 let pool = FetchPool::new(Box::new(InstantMockClient::new()), 10);
639 pool.enqueue(HttpRequest::get("a"), 1.0);
640 pool.flush();
641
642 // Complete it.
643 let results = pool.poll();
644 assert_eq!(results.len(), 1);
645
646 // Now re-enqueue should work.
647 pool.enqueue(HttpRequest::get("a"), 1.0);
648 pool.flush();
649 assert_eq!(pool.in_flight_count(), 1);
650 }
651
652 // -- Cancel -----------------------------------------------------------
653
654 #[test]
655 fn cancel_removes_queued_request() {
656 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
657 pool.enqueue(HttpRequest::get("a"), 1.0);
658 pool.enqueue(HttpRequest::get("b"), 2.0);
659 pool.enqueue(HttpRequest::get("c"), 3.0);
660 pool.flush();
661
662 // 'a' sent (priority 1.0), 'b' and 'c' queued.
663 assert_eq!(pool.queued_count(), 2);
664 assert!(pool.cancel("b"));
665 assert_eq!(pool.queued_count(), 1);
666 }
667
668 #[test]
669 fn cancel_nonexistent_returns_false() {
670 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
671 assert!(!pool.cancel("nope"));
672 }
673
674 #[test]
675 fn cancel_in_flight_returns_false() {
676 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
677 pool.enqueue(HttpRequest::get("a"), 1.0);
678 pool.flush();
679 // 'a' is in-flight, not queued.
680 assert!(!pool.cancel("a"));
681 }
682
683 // -- Clear queue ------------------------------------------------------
684
685 #[test]
686 fn clear_queue_discards_pending() {
687 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
688 pool.enqueue(HttpRequest::get("a"), 1.0);
689 pool.enqueue(HttpRequest::get("b"), 2.0);
690 pool.enqueue(HttpRequest::get("c"), 3.0);
691 pool.flush();
692
693 assert_eq!(pool.queued_count(), 2);
694 pool.clear_queue();
695 assert_eq!(pool.queued_count(), 0);
696 // In-flight request is unaffected.
697 assert_eq!(pool.in_flight_count(), 1);
698 }
699
700 #[test]
701 fn cleared_urls_can_be_re_enqueued() {
702 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 1);
703 pool.enqueue(HttpRequest::get("a"), 1.0);
704 pool.enqueue(HttpRequest::get("b"), 2.0);
705 pool.flush(); // sends 'a', 'b' queued
706 pool.clear_queue(); // drops 'b' from queue and known set
707
708 pool.enqueue(HttpRequest::get("b"), 2.0); // should succeed
709 assert_eq!(pool.queued_count(), 1);
710 }
711
712 // -- Zero-concurrency edge case ---------------------------------------
713
714 #[test]
715 fn zero_concurrency_clamped_to_one() {
716 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 0);
717 pool.enqueue(HttpRequest::get("a"), 1.0);
718 pool.flush();
719 assert_eq!(pool.in_flight_count(), 1);
720 }
721
722 // -- Empty pool -------------------------------------------------------
723
724 #[test]
725 fn poll_empty_returns_empty() {
726 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
727 assert!(pool.poll().is_empty());
728 assert_eq!(pool.in_flight_count(), 0);
729 assert_eq!(pool.queued_count(), 0);
730 }
731
732 #[test]
733 fn flush_empty_is_noop() {
734 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
735 pool.flush(); // should not panic
736 assert_eq!(pool.in_flight_count(), 0);
737 }
738
739 // -- Force-cancel ghost slot reclamation ------------------------------
740
741 #[test]
742 fn force_cancel_immediately_reclaims_in_flight_slot() {
743 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
744
745 pool.enqueue(HttpRequest::get("a"), 1.0);
746 pool.enqueue(HttpRequest::get("b"), 2.0);
747 pool.enqueue(HttpRequest::get("c"), 3.0);
748 pool.flush();
749
750 // 'a' and 'b' in-flight, 'c' queued.
751 assert_eq!(pool.in_flight_count(), 2);
752 assert_eq!(pool.queued_count(), 1);
753
754 // Force-cancel 'a' while it is in-flight.
755 pool.force_cancel("a");
756
757 // The slot should be reclaimed immediately.
758 assert_eq!(pool.in_flight_count(), 1, "ghost slot must be reclaimed immediately");
759 assert_eq!(pool.cancelled_in_flight_count(), 1);
760
761 // Flush should now dispatch 'c' into the freed slot.
762 pool.flush();
763 assert_eq!(pool.in_flight_count(), 2, "freed slot should accept queued request");
764 assert_eq!(pool.queued_count(), 0);
765 }
766
767 #[test]
768 fn ghost_response_does_not_double_decrement_in_flight() {
769 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 2);
770
771 pool.enqueue(HttpRequest::get("a"), 1.0);
772 pool.enqueue(HttpRequest::get("b"), 2.0);
773 pool.flush();
774 assert_eq!(pool.in_flight_count(), 2);
775
776 // Force-cancel 'a': slot reclaimed immediately.
777 pool.force_cancel("a");
778 assert_eq!(pool.in_flight_count(), 1);
779 assert_eq!(pool.cancelled_in_flight_count(), 1);
780
781 // poll returns nothing (DeferredMockClient), so ghost persists.
782 let _ = pool.poll();
783 assert_eq!(pool.in_flight_count(), 1);
784 assert_eq!(pool.cancelled_in_flight_count(), 1, "ghost persists until response arrives");
785 }
786
787 #[test]
788 fn force_cancel_allows_re_enqueue_of_in_flight_url() {
789 let pool = FetchPool::new(Box::new(DeferredMockClient::new()), 10);
790 pool.enqueue(HttpRequest::get("a"), 1.0);
791 pool.flush();
792 assert_eq!(pool.in_flight_count(), 1);
793
794 pool.force_cancel("a");
795 assert_eq!(pool.in_flight_count(), 0);
796
797 // Should be able to re-enqueue immediately.
798 pool.enqueue(HttpRequest::get("a"), 1.0);
799 pool.flush();
800 assert_eq!(pool.in_flight_count(), 1);
801 }
802
803 #[test]
804 fn reenqueue_after_force_cancel_with_shared_dedup_completes_cleanly() {
805 let inner = RecordingClient::default();
806 let shared = SharedHttpClient::new(Box::new(inner.clone()));
807 let pool = FetchPool::new(Box::new(shared), 2);
808
809 pool.enqueue(HttpRequest::get("a"), 1.0);
810 pool.enqueue(HttpRequest::get("b"), 2.0);
811 pool.flush();
812 assert_eq!(pool.in_flight_count(), 2);
813 assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
814
815 // Cancel the in-flight request and immediately re-enqueue the same URL.
816 pool.force_cancel("a");
817 assert_eq!(pool.in_flight_count(), 1);
818 assert_eq!(pool.cancelled_in_flight_count(), 1);
819
820 pool.enqueue(HttpRequest::get("a"), 0.5);
821 pool.flush();
822
823 // SharedHttpClient should dedup the resend against the original
824 // in-flight request, so no second network send for "a" occurs.
825 assert_eq!(pool.in_flight_count(), 2);
826 assert_eq!(inner.sent_urls(), vec!["a".to_string(), "b".to_string()]);
827 assert_eq!(pool.cancelled_in_flight_count(), 0, "re-enqueue should clear ghost marker");
828
829 // When the original response for "a" arrives, it must satisfy the new
830 // logical request rather than being discarded as a ghost.
831 inner.complete("a");
832 let results = pool.poll();
833 assert_eq!(results.len(), 1);
834 assert_eq!(results[0].0, "a");
835 assert_eq!(pool.in_flight_count(), 1, "completion should retire the re-enqueued URL");
836 }
837}