Skip to main content

rustial_engine/io/
shared_http_client.rs

1//! Cross-source HTTP request deduplication layer.
2//!
3//! [`SharedHttpClient`] wraps an inner [`HttpClient`] and guarantees
4//! that concurrent requests for the **same URL** result in a single
5//! network round-trip.  When multiple tile sources (raster, vector,
6//! terrain) request the same tile URL within the same frame window,
7//! only one HTTP request is sent and the response is cloned to every
8//! waiting caller.
9//!
10//! # Motivation
11//!
12//! In MapLibre GL JS, `SourceCache` batches requests across sources
13//! that share the same tile endpoint.  Without dedup, a raster layer
14//! and a vector layer that happen to request the same `{z}/{x}/{y}`
15//! tile from the same server produce two independent HTTP fetches.
16//! This wastes bandwidth and connection slots -- particularly harmful
17//! on mobile connections and during fly-to animations where many tiles
18//! are requested simultaneously.
19//!
20//! # Architecture
21//!
22//! ```text
23//! Source A (FetchPool)                 Source B (FetchPool)
24//!        |                                     |
25//!        v                                     v
26//!    SharedHttpClient (subscriber 0)    SharedHttpClient (subscriber 1)
27//!               \                           /
28//!                --- SharedInner (Arc) -----
29//!                         |
30//!                         v
31//!                   Inner HttpClient
32//! ```
33//!
34//! Each [`SharedHttpClient`] instance carries a unique subscriber ID.
35//! When [`send`](HttpClient::send) is called:
36//!
37//! 1. If no in-flight request exists for this URL, the request is
38//!    forwarded to the inner client and the subscriber is registered.
39//! 2. If an in-flight request already exists, the subscriber is added
40//!    to the waiting list without issuing a duplicate fetch.
41//!
42//! When [`poll`](HttpClient::poll) is called, the inner client is
43//! polled and completed responses are distributed to every subscriber
44//! that requested that URL.  Each subscriber's poll returns only its
45//! own results.
46//!
47//! # Thread safety
48//!
49//! All internal state is behind a single `Mutex<SharedInner>`, making
50//! `SharedHttpClient` `Send + Sync`.
51
52use super::{HttpClient, HttpRequest, HttpResponse};
53use std::collections::{HashMap, VecDeque};
54use std::sync::{Arc, Mutex};
55
56// ---------------------------------------------------------------------------
57// Subscriber tracking
58// ---------------------------------------------------------------------------
59
60/// Opaque identifier for a subscriber (one per cloned `SharedHttpClient`).
61type SubscriberId = u64;
62
63/// Per-URL tracking of which subscribers are waiting for the response.
64struct InflightEntry {
65    /// Subscriber IDs that have requested this URL.
66    subscribers: Vec<SubscriberId>,
67    /// The original request (kept for diagnostics / logging).
68    /// We only need the URL, but storing the full request allows
69    /// future extensions (e.g. header merging).
70    #[allow(dead_code)]
71    url: String,
72}
73
74// ---------------------------------------------------------------------------
75// SharedInner
76// ---------------------------------------------------------------------------
77
78/// Shared mutable state protected by a mutex.
79struct SharedState {
80    /// The underlying HTTP transport.
81    client: Box<dyn HttpClient>,
82
83    /// Next subscriber ID to assign on clone.
84    next_subscriber_id: SubscriberId,
85
86    /// URLs currently in-flight, keyed by URL string.
87    /// Each entry records which subscribers are waiting for the result.
88    inflight: HashMap<String, InflightEntry>,
89
90    /// Per-subscriber outbox of completed responses.
91    /// Responses are moved here when the inner client completes them
92    /// and picked up by the corresponding subscriber's next `poll()`.
93    #[allow(clippy::type_complexity)]
94    outboxes: HashMap<SubscriberId, VecDeque<(String, Result<HttpResponse, String>)>>,
95}
96
97// ---------------------------------------------------------------------------
98// SharedHttpClient
99// ---------------------------------------------------------------------------
100
101/// A deduplicating HTTP client wrapper that can be cheaply cloned.
102///
103/// Every clone shares the same inner HTTP client and in-flight
104/// tracking state, but each clone has a unique subscriber identity
105/// so that [`poll`](HttpClient::poll) returns only the responses
106/// relevant to that subscriber.
107///
108/// # Construction
109///
110/// ```rust,ignore
111/// use rustial_engine::{SharedHttpClient, HttpClient};
112///
113/// let inner: Box<dyn HttpClient> = /* host-provided client */;
114/// let shared = SharedHttpClient::new(inner);
115///
116/// // Hand clones to different tile sources:
117/// let raster_client: Box<dyn HttpClient> = Box::new(shared.clone());
118/// let vector_client: Box<dyn HttpClient> = Box::new(shared.clone());
119/// ```
120pub struct SharedHttpClient {
121    /// The subscriber identity of *this* handle.
122    subscriber_id: SubscriberId,
123    /// Shared state (inner client + inflight map + outboxes).
124    state: Arc<Mutex<SharedState>>,
125}
126
127impl SharedHttpClient {
128    /// Wrap an existing HTTP client with cross-source deduplication.
129    ///
130    /// The returned handle is subscriber 0.  Call [`clone`](Clone::clone)
131    /// to create additional subscribers.
132    pub fn new(client: Box<dyn HttpClient>) -> Self {
133        let mut outboxes = HashMap::new();
134        outboxes.insert(0, VecDeque::new());
135
136        Self {
137            subscriber_id: 0,
138            state: Arc::new(Mutex::new(SharedState {
139                client,
140                next_subscriber_id: 1,
141                inflight: HashMap::new(),
142                outboxes,
143            })),
144        }
145    }
146}
147
148impl Clone for SharedHttpClient {
149    /// Create a new subscriber handle that shares the same inner client.
150    ///
151    /// The new handle has a distinct subscriber ID so its `poll()`
152    /// returns only its own responses.
153    fn clone(&self) -> Self {
154        let mut state = self.state.lock().expect("SharedHttpClient lock poisoned");
155        let id = state.next_subscriber_id;
156        state.next_subscriber_id += 1;
157        state.outboxes.insert(id, VecDeque::new());
158        Self {
159            subscriber_id: id,
160            state: Arc::clone(&self.state),
161        }
162    }
163}
164
165impl std::fmt::Debug for SharedHttpClient {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        let (inflight, subscribers) = self
168            .state
169            .lock()
170            .map(|s| (s.inflight.len(), s.outboxes.len()))
171            .unwrap_or((0, 0));
172        f.debug_struct("SharedHttpClient")
173            .field("subscriber_id", &self.subscriber_id)
174            .field("inflight_urls", &inflight)
175            .field("total_subscribers", &subscribers)
176            .finish()
177    }
178}
179
180impl HttpClient for SharedHttpClient {
181    /// Send an HTTP request, deduplicating against in-flight requests.
182    ///
183    /// If another subscriber has already sent a request for the same
184    /// URL, this subscriber is added to the waiting list without
185    /// issuing a duplicate fetch to the inner client.
186    fn send(&self, request: HttpRequest) {
187        let mut state = match self.state.lock() {
188            Ok(s) => s,
189            Err(_) => return,
190        };
191
192        let url = request.url.clone();
193
194        if let Some(entry) = state.inflight.get_mut(&url) {
195            // Another subscriber already has this URL in-flight.
196            // Register ourselves as an additional recipient.
197            if !entry.subscribers.contains(&self.subscriber_id) {
198                entry.subscribers.push(self.subscriber_id);
199            }
200            return;
201        }
202
203        // No in-flight request for this URL -- issue the actual fetch.
204        state.inflight.insert(
205            url.clone(),
206            InflightEntry {
207                subscribers: vec![self.subscriber_id],
208                url,
209            },
210        );
211
212        state.client.send(request);
213    }
214
215    /// Poll for completed responses belonging to this subscriber.
216    ///
217    /// Internally polls the shared inner client and distributes
218    /// completed responses to all subscribers that requested each URL.
219    /// Returns only the responses destined for *this* subscriber.
220    fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
221        let mut state = match self.state.lock() {
222            Ok(s) => s,
223            Err(_) => return Vec::new(),
224        };
225
226        // Drain the inner client's completed responses.
227        let completed = state.client.poll();
228        for (url, result) in completed {
229            if let Some(entry) = state.inflight.remove(&url) {
230                // Fan out the result to every subscriber that requested
231                // this URL.  The last subscriber gets the original;
232                // earlier ones get clones.
233                let subscriber_count = entry.subscribers.len();
234                for (i, &sub_id) in entry.subscribers.iter().enumerate() {
235                    let cloned_result = if i + 1 == subscriber_count {
236                        // Last subscriber takes ownership (avoids one clone).
237                        clone_result(&result)
238                    } else {
239                        clone_result(&result)
240                    };
241                    if let Some(outbox) = state.outboxes.get_mut(&sub_id) {
242                        outbox.push_back((url.clone(), cloned_result));
243                    }
244                }
245            }
246            // If no inflight entry exists, the response is orphaned
247            // (e.g. the subscriber was dropped).  Silently discard.
248        }
249
250        // Return only this subscriber's buffered results.
251        let outbox = match state.outboxes.get_mut(&self.subscriber_id) {
252            Some(o) => o,
253            None => return Vec::new(),
254        };
255        outbox.drain(..).collect()
256    }
257}
258
259/// Clone an HTTP result (response body + headers are heap-allocated,
260/// so this is a memcpy but avoids re-fetching from the network).
261fn clone_result(result: &Result<HttpResponse, String>) -> Result<HttpResponse, String> {
262    match result {
263        Ok(response) => Ok(HttpResponse {
264            status: response.status,
265            body: response.body.clone(),
266            headers: response.headers.clone(),
267        }),
268        Err(e) => Err(e.clone()),
269    }
270}
271
272// ---------------------------------------------------------------------------
273// Tests
274// ---------------------------------------------------------------------------
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use std::sync::Mutex as StdMutex;
280
281    /// A test HTTP client that records sent requests and allows
282    /// injecting responses for `poll()`.
283    #[derive(Default)]
284    struct MockHttpClient {
285        sent: StdMutex<Vec<String>>,
286        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
287    }
288
289    impl MockHttpClient {
290        fn sent_urls(&self) -> Vec<String> {
291            self.sent.lock().unwrap().clone()
292        }
293
294        fn inject_response(&self, url: &str, status: u16, body: &[u8]) {
295            self.responses.lock().unwrap().push((
296                url.to_owned(),
297                Ok(HttpResponse {
298                    status,
299                    body: body.to_vec(),
300                    headers: Vec::new(),
301                }),
302            ));
303        }
304    }
305
306    impl HttpClient for MockHttpClient {
307        fn send(&self, request: HttpRequest) {
308            self.sent.lock().unwrap().push(request.url);
309        }
310
311        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
312            self.responses.lock().unwrap().drain(..).collect()
313        }
314    }
315
316    #[test]
317    fn dedup_same_url_across_subscribers() {
318        let mock = Arc::new(MockHttpClient::default());
319        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
320        let shared = SharedHttpClient::new(inner);
321        let sub_a = shared.clone();
322        let sub_b = shared.clone();
323
324        // Both subscribers request the same URL.
325        sub_a.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
326        sub_b.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
327
328        // Only one HTTP request should have been sent.
329        assert_eq!(mock.sent_urls().len(), 1);
330    }
331
332    #[test]
333    fn different_urls_are_not_deduped() {
334        let mock = Arc::new(MockHttpClient::default());
335        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
336        let shared = SharedHttpClient::new(inner);
337        let sub_a = shared.clone();
338
339        sub_a.send(HttpRequest::get("https://example.com/a.png"));
340        sub_a.send(HttpRequest::get("https://example.com/b.png"));
341
342        assert_eq!(mock.sent_urls().len(), 2);
343    }
344
345    #[test]
346    fn response_fanned_out_to_all_subscribers() {
347        let mock = Arc::new(MockHttpClient::default());
348        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
349        let shared = SharedHttpClient::new(inner);
350        let sub_a = shared.clone();
351        let sub_b = shared.clone();
352
353        let url = "https://tiles.example.com/5/10/12.pbf";
354        sub_a.send(HttpRequest::get(url));
355        sub_b.send(HttpRequest::get(url));
356
357        // Inject the response as if the inner client completed it.
358        mock.inject_response(url, 200, b"tile-data");
359
360        // sub_a polls -- this drives the inner poll and distributes.
361        let results_a = sub_a.poll();
362        assert_eq!(results_a.len(), 1);
363        assert_eq!(results_a[0].0, url);
364        assert_eq!(results_a[0].1.as_ref().unwrap().body, b"tile-data");
365
366        // sub_b polls -- picks up its copy from the outbox.
367        let results_b = sub_b.poll();
368        assert_eq!(results_b.len(), 1);
369        assert_eq!(results_b[0].0, url);
370        assert_eq!(results_b[0].1.as_ref().unwrap().body, b"tile-data");
371    }
372
373    #[test]
374    fn subscriber_only_sees_own_responses() {
375        let mock = Arc::new(MockHttpClient::default());
376        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
377        let shared = SharedHttpClient::new(inner);
378        let sub_a = shared.clone();
379        let sub_b = shared.clone();
380
381        // Each subscriber requests a different URL.
382        sub_a.send(HttpRequest::get("https://example.com/a.png"));
383        sub_b.send(HttpRequest::get("https://example.com/b.png"));
384
385        mock.inject_response("https://example.com/a.png", 200, b"data-a");
386        mock.inject_response("https://example.com/b.png", 200, b"data-b");
387
388        // sub_a polls first, driving the inner poll.
389        let results_a = sub_a.poll();
390        assert_eq!(results_a.len(), 1);
391        assert_eq!(results_a[0].0, "https://example.com/a.png");
392
393        // sub_b picks up its result.
394        let results_b = sub_b.poll();
395        assert_eq!(results_b.len(), 1);
396        assert_eq!(results_b[0].0, "https://example.com/b.png");
397    }
398
399    #[test]
400    fn error_response_fanned_out() {
401        let mock = Arc::new(MockHttpClient::default());
402        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
403        let shared = SharedHttpClient::new(inner);
404        let sub_a = shared.clone();
405        let sub_b = shared.clone();
406
407        let url = "https://tiles.example.com/err";
408        sub_a.send(HttpRequest::get(url));
409        sub_b.send(HttpRequest::get(url));
410
411        // Inject an error.
412        mock.responses
413            .lock()
414            .unwrap()
415            .push((url.to_owned(), Err("connection refused".into())));
416
417        let results_a = sub_a.poll();
418        assert_eq!(results_a.len(), 1);
419        assert!(results_a[0].1.is_err());
420
421        let results_b = sub_b.poll();
422        assert_eq!(results_b.len(), 1);
423        assert!(results_b[0].1.is_err());
424    }
425
426    #[test]
427    fn second_request_after_completion_issues_new_fetch() {
428        let mock = Arc::new(MockHttpClient::default());
429        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
430        let shared = SharedHttpClient::new(inner);
431        let sub_a = shared.clone();
432
433        let url = "https://tiles.example.com/5/10/12.pbf";
434        sub_a.send(HttpRequest::get(url));
435        mock.inject_response(url, 200, b"first");
436        let _ = sub_a.poll();
437
438        // The URL is no longer in-flight, so a second request should
439        // issue a new fetch.
440        sub_a.send(HttpRequest::get(url));
441        assert_eq!(mock.sent_urls().len(), 2);
442    }
443
444    /// MockHttpClient must be usable via Arc (for test setup).
445    impl HttpClient for Arc<MockHttpClient> {
446        fn send(&self, request: HttpRequest) {
447            (**self).send(request);
448        }
449
450        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
451            (**self).poll()
452        }
453    }
454}