rustial_engine/io/shared_http_client.rs
1//! Cross-source HTTP request deduplication layer.
2//!
3//! [`SharedHttpClient`] wraps an inner [`HttpClient`] and guarantees
4//! that concurrent requests for the **same URL** result in a single
5//! network round-trip. When multiple tile sources (raster, vector,
6//! terrain) request the same tile URL within the same frame window,
7//! only one HTTP request is sent and the response is cloned to every
8//! waiting caller.
9//!
10//! # Motivation
11//!
12//! In MapLibre GL JS, `SourceCache` batches requests across sources
13//! that share the same tile endpoint. Without dedup, a raster layer
14//! and a vector layer that happen to request the same `{z}/{x}/{y}`
15//! tile from the same server produce two independent HTTP fetches.
16//! This wastes bandwidth and connection slots -- particularly harmful
17//! on mobile connections and during fly-to animations where many tiles
18//! are requested simultaneously.
19//!
20//! # Architecture
21//!
22//! ```text
23//! Source A (FetchPool) Source B (FetchPool)
24//! | |
25//! v v
26//! SharedHttpClient (subscriber 0) SharedHttpClient (subscriber 1)
27//! \ /
28//! --- SharedInner (Arc) -----
29//! |
30//! v
31//! Inner HttpClient
32//! ```
33//!
34//! Each [`SharedHttpClient`] instance carries a unique subscriber ID.
35//! When [`send`](HttpClient::send) is called:
36//!
37//! 1. If no in-flight request exists for this URL, the request is
38//! forwarded to the inner client and the subscriber is registered.
39//! 2. If an in-flight request already exists, the subscriber is added
40//! to the waiting list without issuing a duplicate fetch.
41//!
42//! When [`poll`](HttpClient::poll) is called, the inner client is
43//! polled and completed responses are distributed to every subscriber
44//! that requested that URL. Each subscriber's poll returns only its
45//! own results.
46//!
47//! # Thread safety
48//!
49//! All internal state is behind a single `Mutex<SharedInner>`, making
50//! `SharedHttpClient` `Send + Sync`.
51
52use super::{HttpClient, HttpRequest, HttpResponse};
53use std::collections::{HashMap, VecDeque};
54use std::sync::{Arc, Mutex};
55
56// ---------------------------------------------------------------------------
57// Subscriber tracking
58// ---------------------------------------------------------------------------
59
60/// Opaque identifier for a subscriber (one per cloned `SharedHttpClient`).
61type SubscriberId = u64;
62
63/// Per-URL tracking of which subscribers are waiting for the response.
64struct InflightEntry {
65 /// Subscriber IDs that have requested this URL.
66 subscribers: Vec<SubscriberId>,
67 /// The original request (kept for diagnostics / logging).
68 /// We only need the URL, but storing the full request allows
69 /// future extensions (e.g. header merging).
70 #[allow(dead_code)]
71 url: String,
72}
73
74// ---------------------------------------------------------------------------
75// SharedInner
76// ---------------------------------------------------------------------------
77
78/// Shared mutable state protected by a mutex.
79struct SharedState {
80 /// The underlying HTTP transport.
81 client: Box<dyn HttpClient>,
82
83 /// Next subscriber ID to assign on clone.
84 next_subscriber_id: SubscriberId,
85
86 /// URLs currently in-flight, keyed by URL string.
87 /// Each entry records which subscribers are waiting for the result.
88 inflight: HashMap<String, InflightEntry>,
89
90 /// Per-subscriber outbox of completed responses.
91 /// Responses are moved here when the inner client completes them
92 /// and picked up by the corresponding subscriber's next `poll()`.
93 outboxes: HashMap<SubscriberId, VecDeque<(String, Result<HttpResponse, String>)>>,
94}
95
96// ---------------------------------------------------------------------------
97// SharedHttpClient
98// ---------------------------------------------------------------------------
99
100/// A deduplicating HTTP client wrapper that can be cheaply cloned.
101///
102/// Every clone shares the same inner HTTP client and in-flight
103/// tracking state, but each clone has a unique subscriber identity
104/// so that [`poll`](HttpClient::poll) returns only the responses
105/// relevant to that subscriber.
106///
107/// # Construction
108///
109/// ```rust,ignore
110/// use rustial_engine::{SharedHttpClient, HttpClient};
111///
112/// let inner: Box<dyn HttpClient> = /* host-provided client */;
113/// let shared = SharedHttpClient::new(inner);
114///
115/// // Hand clones to different tile sources:
116/// let raster_client: Box<dyn HttpClient> = Box::new(shared.clone());
117/// let vector_client: Box<dyn HttpClient> = Box::new(shared.clone());
118/// ```
119pub struct SharedHttpClient {
120 /// The subscriber identity of *this* handle.
121 subscriber_id: SubscriberId,
122 /// Shared state (inner client + inflight map + outboxes).
123 state: Arc<Mutex<SharedState>>,
124}
125
126impl SharedHttpClient {
127 /// Wrap an existing HTTP client with cross-source deduplication.
128 ///
129 /// The returned handle is subscriber 0. Call [`clone`](Clone::clone)
130 /// to create additional subscribers.
131 pub fn new(client: Box<dyn HttpClient>) -> Self {
132 let mut outboxes = HashMap::new();
133 outboxes.insert(0, VecDeque::new());
134
135 Self {
136 subscriber_id: 0,
137 state: Arc::new(Mutex::new(SharedState {
138 client,
139 next_subscriber_id: 1,
140 inflight: HashMap::new(),
141 outboxes,
142 })),
143 }
144 }
145}
146
147impl Clone for SharedHttpClient {
148 /// Create a new subscriber handle that shares the same inner client.
149 ///
150 /// The new handle has a distinct subscriber ID so its `poll()`
151 /// returns only its own responses.
152 fn clone(&self) -> Self {
153 let mut state = self.state.lock().expect("SharedHttpClient lock poisoned");
154 let id = state.next_subscriber_id;
155 state.next_subscriber_id += 1;
156 state.outboxes.insert(id, VecDeque::new());
157 Self {
158 subscriber_id: id,
159 state: Arc::clone(&self.state),
160 }
161 }
162}
163
164impl std::fmt::Debug for SharedHttpClient {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 let (inflight, subscribers) = self
167 .state
168 .lock()
169 .map(|s| (s.inflight.len(), s.outboxes.len()))
170 .unwrap_or((0, 0));
171 f.debug_struct("SharedHttpClient")
172 .field("subscriber_id", &self.subscriber_id)
173 .field("inflight_urls", &inflight)
174 .field("total_subscribers", &subscribers)
175 .finish()
176 }
177}
178
179impl HttpClient for SharedHttpClient {
180 /// Send an HTTP request, deduplicating against in-flight requests.
181 ///
182 /// If another subscriber has already sent a request for the same
183 /// URL, this subscriber is added to the waiting list without
184 /// issuing a duplicate fetch to the inner client.
185 fn send(&self, request: HttpRequest) {
186 let mut state = match self.state.lock() {
187 Ok(s) => s,
188 Err(_) => return,
189 };
190
191 let url = request.url.clone();
192
193 if let Some(entry) = state.inflight.get_mut(&url) {
194 // Another subscriber already has this URL in-flight.
195 // Register ourselves as an additional recipient.
196 if !entry.subscribers.contains(&self.subscriber_id) {
197 entry.subscribers.push(self.subscriber_id);
198 }
199 return;
200 }
201
202 // No in-flight request for this URL -- issue the actual fetch.
203 state.inflight.insert(
204 url.clone(),
205 InflightEntry {
206 subscribers: vec![self.subscriber_id],
207 url,
208 },
209 );
210
211 state.client.send(request);
212 }
213
214 /// Poll for completed responses belonging to this subscriber.
215 ///
216 /// Internally polls the shared inner client and distributes
217 /// completed responses to all subscribers that requested each URL.
218 /// Returns only the responses destined for *this* subscriber.
219 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
220 let mut state = match self.state.lock() {
221 Ok(s) => s,
222 Err(_) => return Vec::new(),
223 };
224
225 // Drain the inner client's completed responses.
226 let completed = state.client.poll();
227 for (url, result) in completed {
228 if let Some(entry) = state.inflight.remove(&url) {
229 // Fan out the result to every subscriber that requested
230 // this URL. The last subscriber gets the original;
231 // earlier ones get clones.
232 let subscriber_count = entry.subscribers.len();
233 for (i, &sub_id) in entry.subscribers.iter().enumerate() {
234 let cloned_result = if i + 1 == subscriber_count {
235 // Last subscriber takes ownership (avoids one clone).
236 clone_result(&result)
237 } else {
238 clone_result(&result)
239 };
240 if let Some(outbox) = state.outboxes.get_mut(&sub_id) {
241 outbox.push_back((url.clone(), cloned_result));
242 }
243 }
244 }
245 // If no inflight entry exists, the response is orphaned
246 // (e.g. the subscriber was dropped). Silently discard.
247 }
248
249 // Return only this subscriber's buffered results.
250 let outbox = match state.outboxes.get_mut(&self.subscriber_id) {
251 Some(o) => o,
252 None => return Vec::new(),
253 };
254 outbox.drain(..).collect()
255 }
256}
257
258/// Clone an HTTP result (response body + headers are heap-allocated,
259/// so this is a memcpy but avoids re-fetching from the network).
260fn clone_result(
261 result: &Result<HttpResponse, String>,
262) -> Result<HttpResponse, String> {
263 match result {
264 Ok(response) => Ok(HttpResponse {
265 status: response.status,
266 body: response.body.clone(),
267 headers: response.headers.clone(),
268 }),
269 Err(e) => Err(e.clone()),
270 }
271}
272
273// ---------------------------------------------------------------------------
274// Tests
275// ---------------------------------------------------------------------------
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use std::sync::Mutex as StdMutex;
281
282 /// A test HTTP client that records sent requests and allows
283 /// injecting responses for `poll()`.
284 #[derive(Default)]
285 struct MockHttpClient {
286 sent: StdMutex<Vec<String>>,
287 responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
288 }
289
290 impl MockHttpClient {
291 fn sent_urls(&self) -> Vec<String> {
292 self.sent.lock().unwrap().clone()
293 }
294
295 fn inject_response(&self, url: &str, status: u16, body: &[u8]) {
296 self.responses.lock().unwrap().push((
297 url.to_owned(),
298 Ok(HttpResponse {
299 status,
300 body: body.to_vec(),
301 headers: Vec::new(),
302 }),
303 ));
304 }
305 }
306
307 impl HttpClient for MockHttpClient {
308 fn send(&self, request: HttpRequest) {
309 self.sent.lock().unwrap().push(request.url);
310 }
311
312 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
313 self.responses.lock().unwrap().drain(..).collect()
314 }
315 }
316
317 #[test]
318 fn dedup_same_url_across_subscribers() {
319 let mock = Arc::new(MockHttpClient::default());
320 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
321 let shared = SharedHttpClient::new(inner);
322 let sub_a = shared.clone();
323 let sub_b = shared.clone();
324
325 // Both subscribers request the same URL.
326 sub_a.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
327 sub_b.send(HttpRequest::get("https://tiles.example.com/5/10/12.pbf"));
328
329 // Only one HTTP request should have been sent.
330 assert_eq!(mock.sent_urls().len(), 1);
331 }
332
333 #[test]
334 fn different_urls_are_not_deduped() {
335 let mock = Arc::new(MockHttpClient::default());
336 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
337 let shared = SharedHttpClient::new(inner);
338 let sub_a = shared.clone();
339
340 sub_a.send(HttpRequest::get("https://example.com/a.png"));
341 sub_a.send(HttpRequest::get("https://example.com/b.png"));
342
343 assert_eq!(mock.sent_urls().len(), 2);
344 }
345
346 #[test]
347 fn response_fanned_out_to_all_subscribers() {
348 let mock = Arc::new(MockHttpClient::default());
349 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
350 let shared = SharedHttpClient::new(inner);
351 let sub_a = shared.clone();
352 let sub_b = shared.clone();
353
354 let url = "https://tiles.example.com/5/10/12.pbf";
355 sub_a.send(HttpRequest::get(url));
356 sub_b.send(HttpRequest::get(url));
357
358 // Inject the response as if the inner client completed it.
359 mock.inject_response(url, 200, b"tile-data");
360
361 // sub_a polls -- this drives the inner poll and distributes.
362 let results_a = sub_a.poll();
363 assert_eq!(results_a.len(), 1);
364 assert_eq!(results_a[0].0, url);
365 assert_eq!(
366 results_a[0].1.as_ref().unwrap().body,
367 b"tile-data"
368 );
369
370 // sub_b polls -- picks up its copy from the outbox.
371 let results_b = sub_b.poll();
372 assert_eq!(results_b.len(), 1);
373 assert_eq!(results_b[0].0, url);
374 assert_eq!(
375 results_b[0].1.as_ref().unwrap().body,
376 b"tile-data"
377 );
378 }
379
380 #[test]
381 fn subscriber_only_sees_own_responses() {
382 let mock = Arc::new(MockHttpClient::default());
383 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
384 let shared = SharedHttpClient::new(inner);
385 let sub_a = shared.clone();
386 let sub_b = shared.clone();
387
388 // Each subscriber requests a different URL.
389 sub_a.send(HttpRequest::get("https://example.com/a.png"));
390 sub_b.send(HttpRequest::get("https://example.com/b.png"));
391
392 mock.inject_response("https://example.com/a.png", 200, b"data-a");
393 mock.inject_response("https://example.com/b.png", 200, b"data-b");
394
395 // sub_a polls first, driving the inner poll.
396 let results_a = sub_a.poll();
397 assert_eq!(results_a.len(), 1);
398 assert_eq!(results_a[0].0, "https://example.com/a.png");
399
400 // sub_b picks up its result.
401 let results_b = sub_b.poll();
402 assert_eq!(results_b.len(), 1);
403 assert_eq!(results_b[0].0, "https://example.com/b.png");
404 }
405
406 #[test]
407 fn error_response_fanned_out() {
408 let mock = Arc::new(MockHttpClient::default());
409 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
410 let shared = SharedHttpClient::new(inner);
411 let sub_a = shared.clone();
412 let sub_b = shared.clone();
413
414 let url = "https://tiles.example.com/err";
415 sub_a.send(HttpRequest::get(url));
416 sub_b.send(HttpRequest::get(url));
417
418 // Inject an error.
419 mock.responses
420 .lock()
421 .unwrap()
422 .push((url.to_owned(), Err("connection refused".into())));
423
424 let results_a = sub_a.poll();
425 assert_eq!(results_a.len(), 1);
426 assert!(results_a[0].1.is_err());
427
428 let results_b = sub_b.poll();
429 assert_eq!(results_b.len(), 1);
430 assert!(results_b[0].1.is_err());
431 }
432
433 #[test]
434 fn second_request_after_completion_issues_new_fetch() {
435 let mock = Arc::new(MockHttpClient::default());
436 let inner: Box<dyn HttpClient> = Box::new(Arc::clone(&mock));
437 let shared = SharedHttpClient::new(inner);
438 let sub_a = shared.clone();
439
440 let url = "https://tiles.example.com/5/10/12.pbf";
441 sub_a.send(HttpRequest::get(url));
442 mock.inject_response(url, 200, b"first");
443 let _ = sub_a.poll();
444
445 // The URL is no longer in-flight, so a second request should
446 // issue a new fetch.
447 sub_a.send(HttpRequest::get(url));
448 assert_eq!(mock.sent_urls().len(), 2);
449 }
450
451 /// MockHttpClient must be usable via Arc (for test setup).
452 impl HttpClient for Arc<MockHttpClient> {
453 fn send(&self, request: HttpRequest) {
454 (**self).send(request);
455 }
456
457 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
458 (**self).poll()
459 }
460 }
461}