rustial_engine/io/shared_http_client.rs
1//! Cross-source HTTP request deduplication layer.
2//!
3//! [`SharedHttpClient`] wraps an inner [`HttpClient`] and guarantees
4//! that concurrent requests for the **same URL** result in a single
5//! network round-trip. When multiple tile sources (raster, vector,
6//! terrain) request the same tile URL within the same frame window,
7//! only one HTTP request is sent and the response is cloned to every
8//! waiting caller.
9//!
10//! # Motivation
11//!
12//! In MapLibre GL JS, `SourceCache` batches requests across sources
13//! that share the same tile endpoint. Without dedup, a raster layer
14//! and a vector layer that happen to request the same `{z}/{x}/{y}`
15//! tile from the same server produce two independent HTTP fetches.
16//! This wastes bandwidth and connection slots -- particularly harmful
17//! on mobile connections and during fly-to animations where many tiles
18//! are requested simultaneously.
19//!
20//! # Architecture
21//!
22//! ```text
23//! Source A (FetchPool) Source B (FetchPool)
24//! | |
25//! v v
26//! SharedHttpClient (subscriber 0) SharedHttpClient (subscriber 1)
27//! \ /
28//! --- SharedInner (Arc) -----
29//! |
30//! v
31//! Inner HttpClient
32//! ```
33//!
34//! Each [`SharedHttpClient`] instance carries a unique subscriber ID.
35//! When [`send`](HttpClient::send) is called:
36//!
37//! 1. If no in-flight request exists for this URL, the request is
38//! forwarded to the inner client and the subscriber is registered.
39//! 2. If an in-flight request already exists, the subscriber is added
40//! to the waiting list without issuing a duplicate fetch.
41//!
42//! When [`poll`](HttpClient::poll) is called, the inner client is
43//! polled and completed responses are distributed to every subscriber
44//! that requested that URL. Each subscriber's poll returns only its
45//! own results.
46//!
47//! # Thread safety
48//!
49//! All internal state is behind a single `Mutex<SharedInner>`, making
50//! `SharedHttpClient` `Send + Sync`.
51
52use super::{HttpClient, HttpRequest, HttpResponse};
53use std::collections::{HashMap, VecDeque};
54use std::sync::{Arc, Mutex};
55
56// ---------------------------------------------------------------------------
57// Subscriber tracking
58// ---------------------------------------------------------------------------
59
60/// Opaque identifier for a subscriber (one per cloned `SharedHttpClient`).
61type SubscriberId = u64;
62
63/// Per-URL tracking of which subscribers are waiting for the response.
64struct InflightEntry {
65 /// Subscriber IDs that have requested this URL.
66 subscribers: Vec<SubscriberId>,
67 /// The original request (kept for diagnostics / logging).
68 /// We only need the URL, but storing the full request allows
69 /// future extensions (e.g. header merging).
70 #[allow(dead_code)]
71 url: String,
72}
73
74// ---------------------------------------------------------------------------
75// SharedInner
76// ---------------------------------------------------------------------------
77
78/// Shared mutable state protected by a mutex.
79struct SharedState {
80 /// The underlying HTTP transport.
81 client: Box<dyn HttpClient>,
82
83 /// Next subscriber ID to assign on clone.
84 next_subscriber_id: SubscriberId,
85
86 /// URLs currently in-flight, keyed by URL string.
87 /// Each entry records which subscribers are waiting for the result.
88 inflight: HashMap<String, InflightEntry>,
89
90 /// Per-subscriber outbox of completed responses.
91 /// Responses are moved here when the inner client completes them
92 /// and picked up by the corresponding subscriber's next `poll()`.
93 #[allow(clippy::type_complexity)]
94 outboxes: HashMap<SubscriberId, VecDeque<(String, Result<HttpResponse, String>)>>,
95}
96
97// ---------------------------------------------------------------------------
98// SharedHttpClient
99// ---------------------------------------------------------------------------
100
101/// A deduplicating HTTP client wrapper that can be cheaply cloned.
102///
103/// Every clone shares the same inner HTTP client and in-flight
104/// tracking state, but each clone has a unique subscriber identity
105/// so that [`poll`](HttpClient::poll) returns only the responses
106/// relevant to that subscriber.
107///
108/// # Construction
109///
110/// ```rust,ignore
111/// use rustial_engine::{SharedHttpClient, HttpClient};
112///
113/// let inner: Box<dyn HttpClient> = /* host-provided client */;
114/// let shared = SharedHttpClient::new(inner);
115///
116/// // Hand clones to different tile sources:
117/// let raster_client: Box<dyn HttpClient> = Box::new(shared.clone());
118/// let vector_client: Box<dyn HttpClient> = Box::new(shared.clone());
119/// ```
120pub struct SharedHttpClient {
121 /// The subscriber identity of *this* handle.
122 subscriber_id: SubscriberId,
123 /// Shared state (inner client + inflight map + outboxes).
124 state: Arc<Mutex<SharedState>>,
125}
126
127impl SharedHttpClient {
128 /// Wrap an existing HTTP client with cross-source deduplication.
129 ///
130 /// The returned handle is subscriber 0. Call [`clone`](Clone::clone)
131 /// to create additional subscribers.
132 pub fn new(client: Box<dyn HttpClient>) -> Self {
133 let mut outboxes = HashMap::new();
134 outboxes.insert(0, VecDeque::new());
135
136 Self {
137 subscriber_id: 0,
138 state: Arc::new(Mutex::new(SharedState {
139 client,
140 next_subscriber_id: 1,
141 inflight: HashMap::new(),
142 outboxes,
143 })),
144 }
145 }
146}
147
148impl Clone for SharedHttpClient {
149 /// Create a new subscriber handle that shares the same inner client.
150 ///
151 /// The new handle has a distinct subscriber ID so its `poll()`
152 /// returns only its own responses.
153 fn clone(&self) -> Self {
154 let mut state = self.state.lock().expect("SharedHttpClient lock poisoned");
155 let id = state.next_subscriber_id;
156 state.next_subscriber_id += 1;
157 state.outboxes.insert(id, VecDeque::new());
158 Self {
159 subscriber_id: id,
160 state: Arc::clone(&self.state),
161 }
162 }
163}
164
165impl std::fmt::Debug for SharedHttpClient {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 let (inflight, subscribers) = self
168 .state
169 .lock()
170 .map(|s| (s.inflight.len(), s.outboxes.len()))
171 .unwrap_or((0, 0));
172 f.debug_struct("SharedHttpClient")
173 .field("subscriber_id", &self.subscriber_id)
174 .field("inflight_urls", &inflight)
175 .field("total_subscribers", &subscribers)
176 .finish()
177 }
178}
179
180impl HttpClient for SharedHttpClient {
181 /// Send an HTTP request, deduplicating against in-flight requests.
182 ///
183 /// If another subscriber has already sent a request for the same
184 /// URL, this subscriber is added to the waiting list without
185 /// issuing a duplicate fetch to the inner client.
186 fn send(&self, request: HttpRequest) {
187 let mut state = match self.state.lock() {
188 Ok(s) => s,
189 Err(_) => return,
190 };
191
192 let url = request.url.clone();
193
194 if let Some(entry) = state.inflight.get_mut(&url) {
195 // Another subscriber already has this URL in-flight.
196 // Register ourselves as an additional recipient.
197 if !entry.subscribers.contains(&self.subscriber_id) {
198 entry.subscribers.push(self.subscriber_id);
199 }
200 return;
201 }
202
203 // No in-flight request for this URL -- issue the actual fetch.
204 state.inflight.insert(
205 url.clone(),
206 InflightEntry {
207 subscribers: vec![self.subscriber_id],
208 url,
209 },
210 );
211
212 state.client.send(request);
213 }
214
215 /// Poll for completed responses belonging to this subscriber.
216 ///
217 /// Internally polls the shared inner client and distributes
218 /// completed responses to all subscribers that requested each URL.
219 /// Returns only the responses destined for *this* subscriber.
220 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
221 let mut state = match self.state.lock() {
222 Ok(s) => s,
223 Err(_) => return Vec::new(),
224 };
225
226 // Drain the inner client's completed responses.
227 let completed = state.client.poll();
228 for (url, result) in completed {
229 if let Some(entry) = state.inflight.remove(&url) {
230 // Fan out the result to every subscriber that requested
231 // this URL. The last subscriber gets the original;
232 // earlier ones get clones.
233 let subscriber_count = entry.subscribers.len();
234 for (i, &sub_id) in entry.subscribers.iter().enumerate() {
235 let cloned_result = if i + 1 == subscriber_count {
236 // Last subscriber takes ownership (avoids one clone).
237 clone_result(&result)
238 } else {
239 clone_result(&result)
240 };
241 if let Some(outbox) = state.outboxes.get_mut(&sub_id) {
242 outbox.push_back((url.clone(), cloned_result));
243 }
244 }
245 }
246 // If no inflight entry exists, the response is orphaned
247 // (e.g. the subscriber was dropped). Silently discard.
248 }
249
250 // Return only this subscriber's buffered results.
251 let outbox = match state.outboxes.get_mut(&self.subscriber_id) {
252 Some(o) => o,
253 None => return Vec::new(),
254 };
255 outbox.drain(..).collect()
256 }
257}
258
259/// Clone an HTTP result (response body + headers are heap-allocated,
260/// so this is a memcpy but avoids re-fetching from the network).
261fn clone_result(result: &Result<HttpResponse, String>) -> Result<HttpResponse, String> {
262 match result {
263 Ok(response) => Ok(HttpResponse {
264 status: response.status,
265 body: response.body.clone(),
266 headers: response.headers.clone(),
267 }),
268 Err(e) => Err(e.clone()),
269 }
270}
271
272// ---------------------------------------------------------------------------
273// Tests
274// ---------------------------------------------------------------------------
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use std::sync::Mutex as StdMutex;
280
281 /// A test HTTP client that records sent requests and allows
282 /// injecting responses for `poll()`.
283 #[derive(Default)]
284 struct MockHttpClient {
285 sent: StdMutex<Vec<String>>,
286 responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
287 }
288
289 impl MockHttpClient {
290 fn sent_urls(&self) -> Vec<String> {
291 self.sent.lock().unwrap().clone()
292 }
293
294 fn inject_response(&self, url: &str, status: u16, body: &[u8]) {
295 self.responses.lock().unwrap().push((
296 url.to_owned(),
297 Ok(HttpResponse {
298 status,
299 body: body.to_vec(),
300 headers: Vec::new(),
301 }),
302 ));
303 }
304 }
305
306 impl HttpClient for MockHttpClient {
307 fn send(&self, request: HttpRequest) {
308 self.sent.lock().unwrap().push(request.url);
309 }
310
311 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
312 self.responses.lock().unwrap().drain(..).collect()
313 }
314 }
315
316 #[test]
317 fn dedup_same_url_across_subscribers() {
318 let mock = Arc::new(MockHttpClient::default());
319 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
320 let shared = SharedHttpClient::new(inner);
321 let sub_a = shared.clone();
322 let sub_b = shared.clone();
323
324 // Both subscribers request the same URL.
325 sub_a.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
326 sub_b.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
327
328 // Only one HTTP request should have been sent.
329 assert_eq!(mock.sent_urls().len(), 1);
330 }
331
332 #[test]
333 fn different_urls_are_not_deduped() {
334 let mock = Arc::new(MockHttpClient::default());
335 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
336 let shared = SharedHttpClient::new(inner);
337 let sub_a = shared.clone();
338
339 sub_a.send(HttpRequest::get("https://example.com/a.png"));
340 sub_a.send(HttpRequest::get("https://example.com/b.png"));
341
342 assert_eq!(mock.sent_urls().len(), 2);
343 }
344
345 #[test]
346 fn response_fanned_out_to_all_subscribers() {
347 let mock = Arc::new(MockHttpClient::default());
348 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
349 let shared = SharedHttpClient::new(inner);
350 let sub_a = shared.clone();
351 let sub_b = shared.clone();
352
353 let url = "https://tiles.example.com/5/10/12.pbf";
354 sub_a.send(HttpRequest::get(url));
355 sub_b.send(HttpRequest::get(url));
356
357 // Inject the response as if the inner client completed it.
358 mock.inject_response(url, 200, b"tile-data");
359
360 // sub_a polls -- this drives the inner poll and distributes.
361 let results_a = sub_a.poll();
362 assert_eq!(results_a.len(), 1);
363 assert_eq!(results_a[0].0, url);
364 assert_eq!(results_a[0].1.as_ref().unwrap().body, b"tile-data");
365
366 // sub_b polls -- picks up its copy from the outbox.
367 let results_b = sub_b.poll();
368 assert_eq!(results_b.len(), 1);
369 assert_eq!(results_b[0].0, url);
370 assert_eq!(results_b[0].1.as_ref().unwrap().body, b"tile-data");
371 }
372
373 #[test]
374 fn subscriber_only_sees_own_responses() {
375 let mock = Arc::new(MockHttpClient::default());
376 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
377 let shared = SharedHttpClient::new(inner);
378 let sub_a = shared.clone();
379 let sub_b = shared.clone();
380
381 // Each subscriber requests a different URL.
382 sub_a.send(HttpRequest::get("https://example.com/a.png"));
383 sub_b.send(HttpRequest::get("https://example.com/b.png"));
384
385 mock.inject_response("https://example.com/a.png", 200, b"data-a");
386 mock.inject_response("https://example.com/b.png", 200, b"data-b");
387
388 // sub_a polls first, driving the inner poll.
389 let results_a = sub_a.poll();
390 assert_eq!(results_a.len(), 1);
391 assert_eq!(results_a[0].0, "https://example.com/a.png");
392
393 // sub_b picks up its result.
394 let results_b = sub_b.poll();
395 assert_eq!(results_b.len(), 1);
396 assert_eq!(results_b[0].0, "https://example.com/b.png");
397 }
398
399 #[test]
400 fn error_response_fanned_out() {
401 let mock = Arc::new(MockHttpClient::default());
402 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
403 let shared = SharedHttpClient::new(inner);
404 let sub_a = shared.clone();
405 let sub_b = shared.clone();
406
407 let url = "https://tiles.example.com/err";
408 sub_a.send(HttpRequest::get(url));
409 sub_b.send(HttpRequest::get(url));
410
411 // Inject an error.
412 mock.responses
413 .lock()
414 .unwrap()
415 .push((url.to_owned(), Err("connection refused".into())));
416
417 let results_a = sub_a.poll();
418 assert_eq!(results_a.len(), 1);
419 assert!(results_a[0].1.is_err());
420
421 let results_b = sub_b.poll();
422 assert_eq!(results_b.len(), 1);
423 assert!(results_b[0].1.is_err());
424 }
425
426 #[test]
427 fn second_request_after_completion_issues_new_fetch() {
428 let mock = Arc::new(MockHttpClient::default());
429 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
430 let shared = SharedHttpClient::new(inner);
431 let sub_a = shared.clone();
432
433 let url = "https://tiles.example.com/5/10/12.pbf";
434 sub_a.send(HttpRequest::get(url));
435 mock.inject_response(url, 200, b"first");
436 let _ = sub_a.poll();
437
438 // The URL is no longer in-flight, so a second request should
439 // issue a new fetch.
440 sub_a.send(HttpRequest::get(url));
441 assert_eq!(mock.sent_urls().len(), 2);
442 }
443
444 /// MockHttpClient must be usable via Arc (for test setup).
445 impl HttpClient for Arc<MockHttpClient> {
446 fn send(&self, request: HttpRequest) {
447 (**self).send(request);
448 }
449
450 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
451 (**self).poll()
452 }
453 }
454}