1use cranpose_core::{compositionLocalOfWithPolicy, CompositionLocal};
2#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
3use futures_util::{stream, StreamExt};
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8#[derive(thiserror::Error, Debug, Clone)]
9pub enum HttpError {
10 #[error("Failed to build HTTP client: {0}")]
11 ClientInit(String),
12 #[error("Request failed for {url}: {message}")]
13 RequestFailed { url: String, message: String },
14 #[error("Request failed with status {status} for {url}")]
15 HttpStatus { url: String, status: u16 },
16 #[error("Failed to read response body for {url}: {message}")]
17 BodyReadFailed { url: String, message: String },
18 #[error("Invalid response for {url}: {message}")]
19 InvalidResponse { url: String, message: String },
20 #[error("No window object available")]
21 NoWindow,
22 #[error("{operation} worker thread panicked")]
23 WorkerPanicked { operation: &'static str },
24 #[error("{operation} requires cranpose-services feature `{feature}`")]
25 UnsupportedFeature {
26 operation: &'static str,
27 feature: &'static str,
28 },
29}
30
31#[cfg(not(target_arch = "wasm32"))]
32pub type HttpFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, HttpError>> + Send + 'a>>;
33
34#[cfg(target_arch = "wasm32")]
35pub type HttpFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, HttpError>> + 'a>>;
36
37pub trait HttpClient: Send + Sync {
38 fn get_text<'a>(&'a self, url: &'a str) -> HttpFuture<'a, String>;
39
40 fn get_bytes<'a>(&'a self, url: &'a str) -> HttpFuture<'a, Vec<u8>> {
41 Box::pin(async move { self.get_text(url).await.map(|text| text.into_bytes()) })
42 }
43}
44
45pub type HttpClientRef = Arc<dyn HttpClient>;
46
47#[cfg(not(target_arch = "wasm32"))]
48pub async fn map_ordered_concurrent<I, T, F, Fut>(
49 items: &[I],
50 concurrency: usize,
51 task: F,
52) -> Result<Vec<T>, HttpError>
53where
54 I: Clone + Send,
55 T: Send,
56 F: Fn(I) -> Fut + Send + Sync + 'static,
57 Fut: Future<Output = T> + Send,
58{
59 let task = Arc::new(task);
60 let mut results = Vec::with_capacity(items.len());
61
62 for chunk in items.chunks(concurrency.max(1)) {
63 let chunk_results = std::thread::scope(|scope| {
64 let mut handles = Vec::with_capacity(chunk.len());
65 for item in chunk.iter().cloned() {
66 let task = Arc::clone(&task);
67 handles.push(scope.spawn(move || pollster::block_on(task(item))));
68 }
69
70 let mut chunk_results = Vec::with_capacity(handles.len());
71 for handle in handles {
72 let value = handle.join().map_err(|_| HttpError::WorkerPanicked {
73 operation: "ordered concurrent task",
74 })?;
75 chunk_results.push(value);
76 }
77 Ok::<Vec<T>, HttpError>(chunk_results)
78 })?;
79 results.extend(chunk_results);
80 }
81
82 Ok(results)
83}
84
85#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
86pub async fn map_ordered_concurrent<I, T, F, Fut>(
87 items: &[I],
88 concurrency: usize,
89 task: F,
90) -> Result<Vec<T>, HttpError>
91where
92 I: Clone,
93 F: Fn(I) -> Fut + Clone,
94 Fut: Future<Output = T>,
95{
96 let mut results = stream::iter(items.iter().cloned().enumerate().map(|(index, item)| {
97 let task = task.clone();
98 async move { (index, task(item).await) }
99 }))
100 .buffer_unordered(concurrency.max(1))
101 .collect::<Vec<_>>()
102 .await;
103
104 results.sort_by_key(|(index, _)| *index);
105 Ok(results.into_iter().map(|(_, value)| value).collect())
106}
107
108#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
109pub async fn map_ordered_concurrent<I, T, F, Fut>(
110 items: &[I],
111 _concurrency: usize,
112 task: F,
113) -> Result<Vec<T>, HttpError>
114where
115 I: Clone,
116 F: Fn(I) -> Fut,
117 Fut: Future<Output = T>,
118{
119 Ok(map_ordered_sequential(items, task).await)
120}
121
122#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
123async fn map_ordered_sequential<I, T, F, Fut>(items: &[I], task: F) -> Vec<T>
124where
125 I: Clone,
126 F: Fn(I) -> Fut,
127 Fut: Future<Output = T>,
128{
129 let mut results = Vec::with_capacity(items.len());
130 for item in items.iter().cloned() {
131 results.push(task(item).await);
132 }
133 results
134}
135
136struct DefaultHttpClient {
137 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
138 native_client: Result<reqwest::blocking::Client, HttpError>,
139}
140
141impl DefaultHttpClient {
142 fn new() -> Self {
143 Self {
144 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
145 native_client: build_native_client(),
146 }
147 }
148
149 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
150 fn native_response(&self, url: &str) -> Result<reqwest::blocking::Response, HttpError> {
151 let response = self
152 .native_client
153 .as_ref()
154 .map_err(Clone::clone)?
155 .get(url)
156 .send()
157 .map_err(|err| HttpError::RequestFailed {
158 url: url.to_string(),
159 message: err.to_string(),
160 })?;
161
162 let status = response.status();
163 if !status.is_success() {
164 return Err(HttpError::HttpStatus {
165 url: url.to_string(),
166 status: status.as_u16(),
167 });
168 }
169
170 Ok(response)
171 }
172
173 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
174 fn fetch_text_native(&self, url: &str) -> Result<String, HttpError> {
175 self.native_response(url)?
176 .text()
177 .map_err(|err| HttpError::BodyReadFailed {
178 url: url.to_string(),
179 message: err.to_string(),
180 })
181 }
182
183 #[cfg(all(not(target_arch = "wasm32"), not(feature = "http-native")))]
184 fn fetch_text_native(&self, _url: &str) -> Result<String, HttpError> {
185 Err(HttpError::UnsupportedFeature {
186 operation: "native HTTP text requests",
187 feature: "http-native",
188 })
189 }
190
191 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
192 fn fetch_bytes_native(&self, url: &str) -> Result<Vec<u8>, HttpError> {
193 self.native_response(url)?
194 .bytes()
195 .map(|bytes| bytes.to_vec())
196 .map_err(|err| HttpError::BodyReadFailed {
197 url: url.to_string(),
198 message: err.to_string(),
199 })
200 }
201
202 #[cfg(all(not(target_arch = "wasm32"), not(feature = "http-native")))]
203 fn fetch_bytes_native(&self, _url: &str) -> Result<Vec<u8>, HttpError> {
204 Err(HttpError::UnsupportedFeature {
205 operation: "native HTTP byte requests",
206 feature: "http-native",
207 })
208 }
209}
210
211impl HttpClient for DefaultHttpClient {
212 fn get_text<'a>(&'a self, url: &'a str) -> HttpFuture<'a, String> {
213 Box::pin(async move {
214 #[cfg(not(target_arch = "wasm32"))]
215 {
216 self.fetch_text_native(url)
217 }
218
219 #[cfg(target_arch = "wasm32")]
220 {
221 fetch_text_web(url).await
222 }
223 })
224 }
225
226 fn get_bytes<'a>(&'a self, url: &'a str) -> HttpFuture<'a, Vec<u8>> {
227 Box::pin(async move {
228 #[cfg(not(target_arch = "wasm32"))]
229 {
230 self.fetch_bytes_native(url)
231 }
232
233 #[cfg(target_arch = "wasm32")]
234 {
235 fetch_bytes_web(url).await
236 }
237 })
238 }
239}
240
241#[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
242fn build_native_client() -> Result<reqwest::blocking::Client, HttpError> {
243 use std::time::Duration;
244
245 configure_native_client_builder(
246 reqwest::blocking::Client::builder()
247 .timeout(Duration::from_secs(10))
248 .user_agent("cranpose/0.1"),
249 )?
250 .build()
251 .map_err(|err| HttpError::ClientInit(err.to_string()))
252}
253
254#[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
255fn configure_native_client_builder(
256 builder: reqwest::blocking::ClientBuilder,
257) -> Result<reqwest::blocking::ClientBuilder, HttpError> {
258 #[cfg(target_os = "android")]
259 {
260 return Ok(builder.tls_certs_only(android_root_certificates()?));
261 }
262
263 #[cfg(not(target_os = "android"))]
264 {
265 Ok(builder)
266 }
267}
268
269#[cfg(all(target_os = "android", feature = "http-native"))]
270fn android_root_certificates() -> Result<Vec<reqwest::Certificate>, HttpError> {
271 certificates_from_der_chain(
272 webpki_root_certs::TLS_SERVER_ROOT_CERTS
273 .iter()
274 .map(|certificate| certificate.as_ref()),
275 )
276}
277
278#[cfg(any(
279 all(test, not(target_arch = "wasm32"), feature = "http-native"),
280 all(target_os = "android", feature = "http-native")
281))]
282fn certificates_from_der_chain<'a, I>(
283 certificates: I,
284) -> Result<Vec<reqwest::Certificate>, HttpError>
285where
286 I: IntoIterator<Item = &'a [u8]>,
287{
288 certificates
289 .into_iter()
290 .enumerate()
291 .map(|(index, der)| {
292 reqwest::Certificate::from_der(der).map_err(|err| {
293 HttpError::ClientInit(format!(
294 "Failed to load TLS root certificate {index}: {err}"
295 ))
296 })
297 })
298 .collect()
299}
300
301#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
302async fn fetch_text_web(url: &str) -> Result<String, HttpError> {
303 use wasm_bindgen::JsCast;
304 use wasm_bindgen_futures::JsFuture;
305 use web_sys::{Request, RequestInit, RequestMode, Response};
306
307 let opts = RequestInit::new();
308 opts.set_method("GET");
309 opts.set_mode(RequestMode::Cors);
310
311 let request =
312 Request::new_with_str_and_init(url, &opts).map_err(|err| HttpError::RequestFailed {
313 url: url.to_string(),
314 message: format!("{:?}", err),
315 })?;
316
317 let window = web_sys::window().ok_or(HttpError::NoWindow)?;
318 let resp_value = JsFuture::from(window.fetch_with_request(&request))
319 .await
320 .map_err(|err| HttpError::RequestFailed {
321 url: url.to_string(),
322 message: format!("{:?}", err),
323 })?;
324
325 let resp: Response = resp_value
326 .dyn_into()
327 .map_err(|_| HttpError::InvalidResponse {
328 url: url.to_string(),
329 message: "Response is not a Response object".to_string(),
330 })?;
331
332 if !resp.ok() {
333 return Err(HttpError::HttpStatus {
334 url: url.to_string(),
335 status: resp.status(),
336 });
337 }
338
339 let text_promise = resp.text().map_err(|err| HttpError::BodyReadFailed {
340 url: url.to_string(),
341 message: format!("{:?}", err),
342 })?;
343 let text_value =
344 JsFuture::from(text_promise)
345 .await
346 .map_err(|err| HttpError::BodyReadFailed {
347 url: url.to_string(),
348 message: format!("{:?}", err),
349 })?;
350
351 text_value
352 .as_string()
353 .ok_or_else(|| HttpError::InvalidResponse {
354 url: url.to_string(),
355 message: "Response body is not a string".to_string(),
356 })
357}
358
359#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
360async fn fetch_text_web(_url: &str) -> Result<String, HttpError> {
361 Err(HttpError::UnsupportedFeature {
362 operation: "web HTTP text requests",
363 feature: "web-http",
364 })
365}
366
367#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
368async fn fetch_bytes_web(url: &str) -> Result<Vec<u8>, HttpError> {
369 use wasm_bindgen::JsCast;
370 use wasm_bindgen_futures::JsFuture;
371 use web_sys::{Request, RequestInit, RequestMode, Response};
372
373 let opts = RequestInit::new();
374 opts.set_method("GET");
375 opts.set_mode(RequestMode::Cors);
376
377 let request =
378 Request::new_with_str_and_init(url, &opts).map_err(|err| HttpError::RequestFailed {
379 url: url.to_string(),
380 message: format!("{:?}", err),
381 })?;
382
383 let window = web_sys::window().ok_or(HttpError::NoWindow)?;
384 let resp_value = JsFuture::from(window.fetch_with_request(&request))
385 .await
386 .map_err(|err| HttpError::RequestFailed {
387 url: url.to_string(),
388 message: format!("{:?}", err),
389 })?;
390
391 let resp: Response = resp_value
392 .dyn_into()
393 .map_err(|_| HttpError::InvalidResponse {
394 url: url.to_string(),
395 message: "Response is not a Response object".to_string(),
396 })?;
397
398 if !resp.ok() {
399 return Err(HttpError::HttpStatus {
400 url: url.to_string(),
401 status: resp.status(),
402 });
403 }
404
405 let bytes_promise = resp
406 .array_buffer()
407 .map_err(|err| HttpError::BodyReadFailed {
408 url: url.to_string(),
409 message: format!("{:?}", err),
410 })?;
411 let bytes_value =
412 JsFuture::from(bytes_promise)
413 .await
414 .map_err(|err| HttpError::BodyReadFailed {
415 url: url.to_string(),
416 message: format!("{:?}", err),
417 })?;
418
419 let array = js_sys::Uint8Array::new(&bytes_value);
420 Ok(array.to_vec())
421}
422
423#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
424async fn fetch_bytes_web(_url: &str) -> Result<Vec<u8>, HttpError> {
425 Err(HttpError::UnsupportedFeature {
426 operation: "web HTTP byte requests",
427 feature: "web-http",
428 })
429}
430
431pub fn default_http_client() -> HttpClientRef {
432 Arc::new(DefaultHttpClient::new())
433}
434
435pub fn local_http_client() -> CompositionLocal<HttpClientRef> {
436 thread_local! {
437 static LOCAL_HTTP_CLIENT: std::cell::RefCell<Option<CompositionLocal<HttpClientRef>>> = const { std::cell::RefCell::new(None) };
438 }
439
440 LOCAL_HTTP_CLIENT.with(|cell| {
441 let mut local = cell.borrow_mut();
442 local
443 .get_or_insert_with(|| compositionLocalOfWithPolicy(default_http_client, Arc::ptr_eq))
444 .clone()
445 })
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use crate::run_test_composition;
452 use cranpose_core::CompositionLocalProvider;
453 use std::cell::RefCell;
454 use std::rc::Rc;
455 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
456 use std::thread;
457
458 struct TestHttpClient;
459
460 impl HttpClient for TestHttpClient {
461 fn get_text<'a>(&'a self, _url: &'a str) -> HttpFuture<'a, String> {
462 Box::pin(async { Ok("ok".to_string()) })
463 }
464 }
465
466 #[test]
467 fn default_http_client_is_available() {
468 let client = default_http_client();
469 let cloned = client.clone();
470 assert_eq!(Arc::strong_count(&client), 2);
471 drop(cloned);
472 assert_eq!(Arc::strong_count(&client), 1);
473 }
474
475 #[test]
476 fn default_http_client_has_no_process_global_native_client_cache() {
477 let source = include_str!("http.rs");
478 let once_lock = ["Once", "Lock"].concat();
479 let static_client = ["static ", "CLIENT"].concat();
480 let native_client_fn = ["fn ", "native_client()"].concat();
481
482 assert!(
483 !source.contains(&static_client)
484 && !source.contains(&native_client_fn)
485 && !source.contains(&once_lock),
486 "native HTTP client state must be owned by DefaultHttpClient instead of a process-global cache"
487 );
488 }
489
490 #[test]
491 fn test_client_uses_default_get_bytes_from_text() {
492 let client = TestHttpClient;
493 let bytes = pollster::block_on(client.get_bytes("https://example.com")).expect("bytes");
494 assert_eq!(bytes, b"ok".to_vec());
495 }
496
497 #[test]
498 fn map_ordered_concurrent_preserves_input_order() {
499 let inputs = [3usize, 1, 4, 1, 5];
500 let outputs = pollster::block_on(map_ordered_concurrent(&inputs, 2, |value| async move {
501 value * 10
502 }))
503 .expect("ordered concurrent mapping");
504
505 assert_eq!(outputs, vec![30, 10, 40, 10, 50]);
506 }
507
508 #[test]
509 fn native_ordered_concurrency_is_not_tied_to_native_http_client_feature() {
510 let source = include_str!("http.rs");
511 assert!(
512 source.contains(
513 "#[cfg(not(target_arch = \"wasm32\"))]\npub async fn map_ordered_concurrent"
514 ),
515 "native ordered concurrency must stay available without the HTTP client feature"
516 );
517 assert!(
518 !source.contains(
519 "all(not(target_arch = \"wasm32\"), feature = \"http-native\")]\npub async fn map_ordered_concurrent"
520 ) && !source.contains(
521 "all(not(target_arch = \"wasm32\"), not(feature = \"http-native\"))]\npub async fn map_ordered_concurrent"
522 ),
523 "native ordered concurrency must not split into HTTP-enabled and HTTP-disabled behavior"
524 );
525 }
526
527 #[cfg(not(target_arch = "wasm32"))]
528 #[test]
529 fn map_ordered_concurrent_reports_worker_panic() {
530 let inputs = [1usize];
531 let should_panic = Arc::new(std::sync::atomic::AtomicBool::new(true));
532 let should_panic_for_task = Arc::clone(&should_panic);
533
534 let error = pollster::block_on(map_ordered_concurrent(&inputs, 1, move |_| {
535 let should_panic = Arc::clone(&should_panic_for_task);
536 async move {
537 if should_panic.load(std::sync::atomic::Ordering::SeqCst) {
538 panic!("test worker panic");
539 }
540 1usize
541 }
542 }))
543 .expect_err("worker panic should be reported");
544
545 assert!(matches!(error, HttpError::WorkerPanicked { .. }));
546 }
547
548 #[test]
549 fn local_http_client_can_be_overridden() {
550 let local = local_http_client();
551 let default_client = default_http_client();
552 let custom_client: HttpClientRef = Arc::new(TestHttpClient);
553 let captured = Rc::new(RefCell::new(None));
554
555 {
556 let captured_for_closure = Rc::clone(&captured);
557 let custom_client = custom_client.clone();
558 let local_for_provider = local.clone();
559 let local_for_read = local.clone();
560 run_test_composition(move || {
561 let captured = Rc::clone(&captured_for_closure);
562 let local_for_read = local_for_read.clone();
563 CompositionLocalProvider(
564 vec![local_for_provider.provides(custom_client.clone())],
565 move || {
566 let current = local_for_read.current();
567 *captured.borrow_mut() = Some(current);
568 },
569 );
570 });
571 }
572
573 let current = captured.borrow().as_ref().expect("client captured").clone();
574 assert!(Arc::ptr_eq(¤t, &custom_client));
575 assert!(!Arc::ptr_eq(¤t, &default_client));
576 }
577
578 #[cfg(all(not(target_arch = "wasm32"), not(feature = "http-native")))]
579 #[test]
580 fn default_http_client_reports_disabled_native_http_feature() {
581 let error = pollster::block_on(default_http_client().get_text("https://example.com"))
582 .expect_err("native HTTP should be feature-gated");
583
584 assert!(matches!(
585 error,
586 HttpError::UnsupportedFeature {
587 feature: "http-native",
588 ..
589 }
590 ));
591 }
592
593 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
594 #[test]
595 fn native_http_client_builds() {
596 build_native_client().expect("native HTTP client should initialize");
597 }
598
599 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
600 #[test]
601 fn certificates_from_der_chain_accepts_valid_roots() {
602 let certificates = certificates_from_der_chain(
603 webpki_root_certs::TLS_SERVER_ROOT_CERTS
604 .iter()
605 .take(3)
606 .map(|certificate| certificate.as_ref()),
607 )
608 .expect("root certificates should parse");
609
610 assert_eq!(certificates.len(), 3);
611 }
612
613 #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
614 #[test]
615 fn default_http_client_fetches_text_from_local_server() {
616 use std::io::{Read, Write};
617 use std::net::TcpListener;
618
619 let listener = match TcpListener::bind("127.0.0.1:0") {
620 Ok(listener) => listener,
621 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
622 eprintln!("skipping local HTTP server bind in restricted test environment: {err}");
623 return;
624 }
625 Err(err) => panic!("bind local test server: {err}"),
626 };
627 let address = listener
628 .local_addr()
629 .expect("read local test server address");
630 let server = thread::spawn(move || {
631 let (mut stream, _) = listener.accept().expect("accept local test request");
632 let mut request = [0_u8; 1024];
633 let _ = stream.read(&mut request).expect("read local test request");
634 let body = "cranpose-http-test";
635 write!(
636 stream,
637 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
638 body.len(),
639 body
640 )
641 .expect("write local test response");
642 });
643
644 let url = format!("http://{address}");
645 let text = pollster::block_on(default_http_client().get_text(&url))
646 .expect("fetch text from local test server");
647 server.join().expect("join local test server");
648
649 assert_eq!(text, "cranpose-http-test");
650 }
651}