Skip to main content

rustial_engine/io/
shared_http_client.rs

1//! Cross-source HTTP request deduplication layer.
2//!
3//! [`SharedHttpClient`] wraps an inner [`HttpClient`] and guarantees
4//! that concurrent requests for the **same URL** result in a single
5//! network round-trip.  When multiple tile sources (raster, vector,
6//! terrain) request the same tile URL within the same frame window,
7//! only one HTTP request is sent and the response is cloned to every
8//! waiting caller.
9//!
10//! # Motivation
11//!
12//! In MapLibre GL JS, `SourceCache` batches requests across sources
13//! that share the same tile endpoint.  Without dedup, a raster layer
14//! and a vector layer that happen to request the same `{z}/{x}/{y}`
15//! tile from the same server produce two independent HTTP fetches.
16//! This wastes bandwidth and connection slots -- particularly harmful
17//! on mobile connections and during fly-to animations where many tiles
18//! are requested simultaneously.
19//!
20//! # Architecture
21//!
22//! ```text
23//! Source A (FetchPool)                 Source B (FetchPool)
24//!        |                                     |
25//!        v                                     v
26//!    SharedHttpClient (subscriber 0)    SharedHttpClient (subscriber 1)
27//!               \                           /
28//!                --- SharedInner (Arc) -----
29//!                         |
30//!                         v
31//!                   Inner HttpClient
32//! ```
33//!
34//! Each [`SharedHttpClient`] instance carries a unique subscriber ID.
35//! When [`send`](HttpClient::send) is called:
36//!
37//! 1. If no in-flight request exists for this URL, the request is
38//!    forwarded to the inner client and the subscriber is registered.
39//! 2. If an in-flight request already exists, the subscriber is added
40//!    to the waiting list without issuing a duplicate fetch.
41//!
42//! When [`poll`](HttpClient::poll) is called, the inner client is
43//! polled and completed responses are distributed to every subscriber
44//! that requested that URL.  Each subscriber's poll returns only its
45//! own results.
46//!
47//! # Thread safety
48//!
49//! All internal state is behind a single `Mutex<SharedInner>`, making
50//! `SharedHttpClient` `Send + Sync`.
51
52use super::{HttpClient, HttpRequest, HttpResponse};
53use std::collections::{HashMap, VecDeque};
54use std::sync::{Arc, Mutex};
55
56// ---------------------------------------------------------------------------
57// Subscriber tracking
58// ---------------------------------------------------------------------------
59
60/// Opaque identifier for a subscriber (one per cloned `SharedHttpClient`).
61type SubscriberId = u64;
62
63/// Per-URL tracking of which subscribers are waiting for the response.
64struct InflightEntry {
65    /// Subscriber IDs that have requested this URL.
66    subscribers: Vec<SubscriberId>,
67    /// The original request (kept for diagnostics / logging).
68    /// We only need the URL, but storing the full request allows
69    /// future extensions (e.g. header merging).
70    #[allow(dead_code)]
71    url: String,
72}
73
74// ---------------------------------------------------------------------------
75// SharedInner
76// ---------------------------------------------------------------------------
77
78/// Shared mutable state protected by a mutex.
79struct SharedState {
80    /// The underlying HTTP transport.
81    client: Box<dyn HttpClient>,
82
83    /// Next subscriber ID to assign on clone.
84    next_subscriber_id: SubscriberId,
85
86    /// URLs currently in-flight, keyed by URL string.
87    /// Each entry records which subscribers are waiting for the result.
88    inflight: HashMap<String, InflightEntry>,
89
90    /// Per-subscriber outbox of completed responses.
91    /// Responses are moved here when the inner client completes them
92    /// and picked up by the corresponding subscriber's next `poll()`.
93    outboxes: HashMap<SubscriberId, VecDeque<(String, Result<HttpResponse, String>)>>,
94}
95
96// ---------------------------------------------------------------------------
97// SharedHttpClient
98// ---------------------------------------------------------------------------
99
100/// A deduplicating HTTP client wrapper that can be cheaply cloned.
101///
102/// Every clone shares the same inner HTTP client and in-flight
103/// tracking state, but each clone has a unique subscriber identity
104/// so that [`poll`](HttpClient::poll) returns only the responses
105/// relevant to that subscriber.
106///
107/// # Construction
108///
109/// ```rust,ignore
110/// use rustial_engine::{SharedHttpClient, HttpClient};
111///
112/// let inner: Box<dyn HttpClient> = /* host-provided client */;
113/// let shared = SharedHttpClient::new(inner);
114///
115/// // Hand clones to different tile sources:
116/// let raster_client: Box<dyn HttpClient> = Box::new(shared.clone());
117/// let vector_client: Box<dyn HttpClient> = Box::new(shared.clone());
118/// ```
119pub struct SharedHttpClient {
120    /// The subscriber identity of *this* handle.
121    subscriber_id: SubscriberId,
122    /// Shared state (inner client + inflight map + outboxes).
123    state: Arc<Mutex<SharedState>>,
124}
125
126impl SharedHttpClient {
127    /// Wrap an existing HTTP client with cross-source deduplication.
128    ///
129    /// The returned handle is subscriber 0.  Call [`clone`](Clone::clone)
130    /// to create additional subscribers.
131    pub fn new(client: Box<dyn HttpClient>) -> Self {
132        let mut outboxes = HashMap::new();
133        outboxes.insert(0, VecDeque::new());
134
135        Self {
136            subscriber_id: 0,
137            state: Arc::new(Mutex::new(SharedState {
138                client,
139                next_subscriber_id: 1,
140                inflight: HashMap::new(),
141                outboxes,
142            })),
143        }
144    }
145}
146
147impl Clone for SharedHttpClient {
148    /// Create a new subscriber handle that shares the same inner client.
149    ///
150    /// The new handle has a distinct subscriber ID so its `poll()`
151    /// returns only its own responses.
152    fn clone(&self) -> Self {
153        let mut state = self.state.lock().expect("SharedHttpClient lock poisoned");
154        let id = state.next_subscriber_id;
155        state.next_subscriber_id += 1;
156        state.outboxes.insert(id, VecDeque::new());
157        Self {
158            subscriber_id: id,
159            state: Arc::clone(&self.state),
160        }
161    }
162}
163
164impl std::fmt::Debug for SharedHttpClient {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        let (inflight, subscribers) = self
167            .state
168            .lock()
169            .map(|s| (s.inflight.len(), s.outboxes.len()))
170            .unwrap_or((0, 0));
171        f.debug_struct("SharedHttpClient")
172            .field("subscriber_id", &self.subscriber_id)
173            .field("inflight_urls", &inflight)
174            .field("total_subscribers", &subscribers)
175            .finish()
176    }
177}
178
179impl HttpClient for SharedHttpClient {
180    /// Send an HTTP request, deduplicating against in-flight requests.
181    ///
182    /// If another subscriber has already sent a request for the same
183    /// URL, this subscriber is added to the waiting list without
184    /// issuing a duplicate fetch to the inner client.
185    fn send(&self, request: HttpRequest) {
186        let mut state = match self.state.lock() {
187            Ok(s) => s,
188            Err(_) => return,
189        };
190
191        let url = request.url.clone();
192
193        if let Some(entry) = state.inflight.get_mut(&url) {
194            // Another subscriber already has this URL in-flight.
195            // Register ourselves as an additional recipient.
196            if !entry.subscribers.contains(&self.subscriber_id) {
197                entry.subscribers.push(self.subscriber_id);
198            }
199            return;
200        }
201
202        // No in-flight request for this URL -- issue the actual fetch.
203        state.inflight.insert(
204            url.clone(),
205            InflightEntry {
206                subscribers: vec![self.subscriber_id],
207                url,
208            },
209        );
210
211        state.client.send(request);
212    }
213
214    /// Poll for completed responses belonging to this subscriber.
215    ///
216    /// Internally polls the shared inner client and distributes
217    /// completed responses to all subscribers that requested each URL.
218    /// Returns only the responses destined for *this* subscriber.
219    fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
220        let mut state = match self.state.lock() {
221            Ok(s) => s,
222            Err(_) => return Vec::new(),
223        };
224
225        // Drain the inner client's completed responses.
226        let completed = state.client.poll();
227        for (url, result) in completed {
228            if let Some(entry) = state.inflight.remove(&url) {
229                // Fan out the result to every subscriber that requested
230                // this URL.  The last subscriber gets the original;
231                // earlier ones get clones.
232                let subscriber_count = entry.subscribers.len();
233                for (i, &sub_id) in entry.subscribers.iter().enumerate() {
234                    let cloned_result = if i + 1 == subscriber_count {
235                        // Last subscriber takes ownership (avoids one clone).
236                        clone_result(&result)
237                    } else {
238                        clone_result(&result)
239                    };
240                    if let Some(outbox) = state.outboxes.get_mut(&sub_id) {
241                        outbox.push_back((url.clone(), cloned_result));
242                    }
243                }
244            }
245            // If no inflight entry exists, the response is orphaned
246            // (e.g. the subscriber was dropped).  Silently discard.
247        }
248
249        // Return only this subscriber's buffered results.
250        let outbox = match state.outboxes.get_mut(&self.subscriber_id) {
251            Some(o) => o,
252            None => return Vec::new(),
253        };
254        outbox.drain(..).collect()
255    }
256}
257
258/// Clone an HTTP result (response body + headers are heap-allocated,
259/// so this is a memcpy but avoids re-fetching from the network).
260fn clone_result(
261    result: &Result<HttpResponse, String>,
262) -> Result<HttpResponse, String> {
263    match result {
264        Ok(response) => Ok(HttpResponse {
265            status: response.status,
266            body: response.body.clone(),
267            headers: response.headers.clone(),
268        }),
269        Err(e) => Err(e.clone()),
270    }
271}
272
273// ---------------------------------------------------------------------------
274// Tests
275// ---------------------------------------------------------------------------
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use std::sync::Mutex as StdMutex;
281
282    /// A test HTTP client that records sent requests and allows
283    /// injecting responses for `poll()`.
284    #[derive(Default)]
285    struct MockHttpClient {
286        sent: StdMutex<Vec<String>>,
287        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
288    }
289
290    impl MockHttpClient {
291        fn sent_urls(&self) -> Vec<String> {
292            self.sent.lock().unwrap().clone()
293        }
294
295        fn inject_response(&self, url: &str, status: u16, body: &[u8]) {
296            self.responses.lock().unwrap().push((
297                url.to_owned(),
298                Ok(HttpResponse {
299                    status,
300                    body: body.to_vec(),
301                    headers: Vec::new(),
302                }),
303            ));
304        }
305    }
306
307    impl HttpClient for MockHttpClient {
308        fn send(&self, request: HttpRequest) {
309            self.sent.lock().unwrap().push(request.url);
310        }
311
312        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
313            self.responses.lock().unwrap().drain(..).collect()
314        }
315    }
316
317    #[test]
318    fn dedup_same_url_across_subscribers() {
319        let mock = Arc::new(MockHttpClient::default());
320        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
321        let shared = SharedHttpClient::new(inner);
322        let sub_a = shared.clone();
323        let sub_b = shared.clone();
324
325        // Both subscribers request the same URL.
326        sub_a.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
327        sub_b.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
328
329        // Only one HTTP request should have been sent.
330        assert_eq!(mock.sent_urls().len(), 1);
331    }
332
333    #[test]
334    fn different_urls_are_not_deduped() {
335        let mock = Arc::new(MockHttpClient::default());
336        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
337        let shared = SharedHttpClient::new(inner);
338        let sub_a = shared.clone();
339
340        sub_a.send(HttpRequest::get("https://example.com/a.png"));
341        sub_a.send(HttpRequest::get("https://example.com/b.png"));
342
343        assert_eq!(mock.sent_urls().len(), 2);
344    }
345
346    #[test]
347    fn response_fanned_out_to_all_subscribers() {
348        let mock = Arc::new(MockHttpClient::default());
349        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
350        let shared = SharedHttpClient::new(inner);
351        let sub_a = shared.clone();
352        let sub_b = shared.clone();
353
354        let url = "https://tiles.example.com/5/10/12.pbf";
355        sub_a.send(HttpRequest::get(url));
356        sub_b.send(HttpRequest::get(url));
357
358        // Inject the response as if the inner client completed it.
359        mock.inject_response(url, 200, b"tile-data");
360
361        // sub_a polls -- this drives the inner poll and distributes.
362        let results_a = sub_a.poll();
363        assert_eq!(results_a.len(), 1);
364        assert_eq!(results_a[0].0, url);
365        assert_eq!(
366            results_a[0].1.as_ref().unwrap().body,
367            b"tile-data"
368        );
369
370        // sub_b polls -- picks up its copy from the outbox.
371        let results_b = sub_b.poll();
372        assert_eq!(results_b.len(), 1);
373        assert_eq!(results_b[0].0, url);
374        assert_eq!(
375            results_b[0].1.as_ref().unwrap().body,
376            b"tile-data"
377        );
378    }
379
380    #[test]
381    fn subscriber_only_sees_own_responses() {
382        let mock = Arc::new(MockHttpClient::default());
383        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
384        let shared = SharedHttpClient::new(inner);
385        let sub_a = shared.clone();
386        let sub_b = shared.clone();
387
388        // Each subscriber requests a different URL.
389        sub_a.send(HttpRequest::get("https://example.com/a.png"));
390        sub_b.send(HttpRequest::get("https://example.com/b.png"));
391
392        mock.inject_response("https://example.com/a.png", 200, b"data-a");
393        mock.inject_response("https://example.com/b.png", 200, b"data-b");
394
395        // sub_a polls first, driving the inner poll.
396        let results_a = sub_a.poll();
397        assert_eq!(results_a.len(), 1);
398        assert_eq!(results_a[0].0, "https://example.com/a.png");
399
400        // sub_b picks up its result.
401        let results_b = sub_b.poll();
402        assert_eq!(results_b.len(), 1);
403        assert_eq!(results_b[0].0, "https://example.com/b.png");
404    }
405
406    #[test]
407    fn error_response_fanned_out() {
408        let mock = Arc::new(MockHttpClient::default());
409        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
410        let shared = SharedHttpClient::new(inner);
411        let sub_a = shared.clone();
412        let sub_b = shared.clone();
413
414        let url = "https://tiles.example.com/err";
415        sub_a.send(HttpRequest::get(url));
416        sub_b.send(HttpRequest::get(url));
417
418        // Inject an error.
419        mock.responses
420            .lock()
421            .unwrap()
422            .push((url.to_owned(), Err("connection refused".into())));
423
424        let results_a = sub_a.poll();
425        assert_eq!(results_a.len(), 1);
426        assert!(results_a[0].1.is_err());
427
428        let results_b = sub_b.poll();
429        assert_eq!(results_b.len(), 1);
430        assert!(results_b[0].1.is_err());
431    }
432
433    #[test]
434    fn second_request_after_completion_issues_new_fetch() {
435        let mock = Arc::new(MockHttpClient::default());
436        let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
437        let shared = SharedHttpClient::new(inner);
438        let sub_a = shared.clone();
439
440        let url = "https://tiles.example.com/5/10/12.pbf";
441        sub_a.send(HttpRequest::get(url));
442        mock.inject_response(url, 200, b"first");
443        let _ = sub_a.poll();
444
445        // The URL is no longer in-flight, so a second request should
446        // issue a new fetch.
447        sub_a.send(HttpRequest::get(url));
448        assert_eq!(mock.sent_urls().len(), 2);
449    }
450
451    /// MockHttpClient must be usable via Arc (for test setup).
452    impl HttpClient for Arc<MockHttpClient> {
453        fn send(&self, request: HttpRequest) {
454            (**self).send(request);
455        }
456
457        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
458            (**self).poll()
459        }
460    }
461}