Skip to main content

cranpose_services/
http.rs

1use cranpose_core::{compositionLocalOfWithPolicy, CompositionLocal};
2#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
3use futures_util::{stream, StreamExt};
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8#[derive(thiserror::Error, Debug, Clone)]
9pub enum HttpError {
10    #[error("Failed to build HTTP client: {0}")]
11    ClientInit(String),
12    #[error("Request failed for {url}: {message}")]
13    RequestFailed { url: String, message: String },
14    #[error("Request failed with status {status} for {url}")]
15    HttpStatus { url: String, status: u16 },
16    #[error("Failed to read response body for {url}: {message}")]
17    BodyReadFailed { url: String, message: String },
18    #[error("Invalid response for {url}: {message}")]
19    InvalidResponse { url: String, message: String },
20    #[error("No window object available")]
21    NoWindow,
22    #[error("{operation} worker thread panicked")]
23    WorkerPanicked { operation: &'static str },
24    #[error("{operation} requires cranpose-services feature `{feature}`")]
25    UnsupportedFeature {
26        operation: &'static str,
27        feature: &'static str,
28    },
29}
30
31#[cfg(not(target_arch = "wasm32"))]
32pub type HttpFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, HttpError>> + Send + 'a>>;
33
34#[cfg(target_arch = "wasm32")]
35pub type HttpFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, HttpError>> + 'a>>;
36
37pub trait HttpClient: Send + Sync {
38    fn get_text<'a>(&'a self, url: &'a str) -> HttpFuture<'a, String>;
39
40    fn get_bytes<'a>(&'a self, url: &'a str) -> HttpFuture<'a, Vec<u8>> {
41        Box::pin(async move { self.get_text(url).await.map(|text| text.into_bytes()) })
42    }
43}
44
45pub type HttpClientRef = Arc<dyn HttpClient>;
46
47#[cfg(not(target_arch = "wasm32"))]
48pub async fn map_ordered_concurrent<I, T, F, Fut>(
49    items: &[I],
50    concurrency: usize,
51    task: F,
52) -> Result<Vec<T>, HttpError>
53where
54    I: Clone + Send,
55    T: Send,
56    F: Fn(I) -> Fut + Send + Sync + 'static,
57    Fut: Future<Output = T> + Send,
58{
59    let task = Arc::new(task);
60    let mut results = Vec::with_capacity(items.len());
61
62    for chunk in items.chunks(concurrency.max(1)) {
63        let chunk_results = std::thread::scope(|scope| {
64            let mut handles = Vec::with_capacity(chunk.len());
65            for item in chunk.iter().cloned() {
66                let task = Arc::clone(&task);
67                handles.push(scope.spawn(move || pollster::block_on(task(item))));
68            }
69
70            let mut chunk_results = Vec::with_capacity(handles.len());
71            for handle in handles {
72                let value = handle.join().map_err(|_| HttpError::WorkerPanicked {
73                    operation: "ordered concurrent task",
74                })?;
75                chunk_results.push(value);
76            }
77            Ok::<Vec<T>, HttpError>(chunk_results)
78        })?;
79        results.extend(chunk_results);
80    }
81
82    Ok(results)
83}
84
85#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
86pub async fn map_ordered_concurrent<I, T, F, Fut>(
87    items: &[I],
88    concurrency: usize,
89    task: F,
90) -> Result<Vec<T>, HttpError>
91where
92    I: Clone,
93    F: Fn(I) -> Fut + Clone,
94    Fut: Future<Output = T>,
95{
96    let mut results = stream::iter(items.iter().cloned().enumerate().map(|(index, item)| {
97        let task = task.clone();
98        async move { (index, task(item).await) }
99    }))
100    .buffer_unordered(concurrency.max(1))
101    .collect::<Vec<_>>()
102    .await;
103
104    results.sort_by_key(|(index, _)| *index);
105    Ok(results.into_iter().map(|(_, value)| value).collect())
106}
107
108#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
109pub async fn map_ordered_concurrent<I, T, F, Fut>(
110    items: &[I],
111    _concurrency: usize,
112    task: F,
113) -> Result<Vec<T>, HttpError>
114where
115    I: Clone,
116    F: Fn(I) -> Fut,
117    Fut: Future<Output = T>,
118{
119    Ok(map_ordered_sequential(items, task).await)
120}
121
122#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
123async fn map_ordered_sequential<I, T, F, Fut>(items: &[I], task: F) -> Vec<T>
124where
125    I: Clone,
126    F: Fn(I) -> Fut,
127    Fut: Future<Output = T>,
128{
129    let mut results = Vec::with_capacity(items.len());
130    for item in items.iter().cloned() {
131        results.push(task(item).await);
132    }
133    results
134}
135
136struct DefaultHttpClient {
137    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
138    native_client: Result<reqwest::blocking::Client, HttpError>,
139}
140
141impl DefaultHttpClient {
142    fn new() -> Self {
143        Self {
144            #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
145            native_client: build_native_client(),
146        }
147    }
148
149    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
150    fn native_response(&self, url: &str) -> Result<reqwest::blocking::Response, HttpError> {
151        let response = self
152            .native_client
153            .as_ref()
154            .map_err(Clone::clone)?
155            .get(url)
156            .send()
157            .map_err(|err| HttpError::RequestFailed {
158                url: url.to_string(),
159                message: err.to_string(),
160            })?;
161
162        let status = response.status();
163        if !status.is_success() {
164            return Err(HttpError::HttpStatus {
165                url: url.to_string(),
166                status: status.as_u16(),
167            });
168        }
169
170        Ok(response)
171    }
172
173    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
174    fn fetch_text_native(&self, url: &str) -> Result<String, HttpError> {
175        self.native_response(url)?
176            .text()
177            .map_err(|err| HttpError::BodyReadFailed {
178                url: url.to_string(),
179                message: err.to_string(),
180            })
181    }
182
183    #[cfg(all(not(target_arch = "wasm32"), not(feature = "http-native")))]
184    fn fetch_text_native(&self, _url: &str) -> Result<String, HttpError> {
185        Err(HttpError::UnsupportedFeature {
186            operation: "native HTTP text requests",
187            feature: "http-native",
188        })
189    }
190
191    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
192    fn fetch_bytes_native(&self, url: &str) -> Result<Vec<u8>, HttpError> {
193        self.native_response(url)?
194            .bytes()
195            .map(|bytes| bytes.to_vec())
196            .map_err(|err| HttpError::BodyReadFailed {
197                url: url.to_string(),
198                message: err.to_string(),
199            })
200    }
201
202    #[cfg(all(not(target_arch = "wasm32"), not(feature = "http-native")))]
203    fn fetch_bytes_native(&self, _url: &str) -> Result<Vec<u8>, HttpError> {
204        Err(HttpError::UnsupportedFeature {
205            operation: "native HTTP byte requests",
206            feature: "http-native",
207        })
208    }
209}
210
211impl HttpClient for DefaultHttpClient {
212    fn get_text<'a>(&'a self, url: &'a str) -> HttpFuture<'a, String> {
213        Box::pin(async move {
214            #[cfg(not(target_arch = "wasm32"))]
215            {
216                self.fetch_text_native(url)
217            }
218
219            #[cfg(target_arch = "wasm32")]
220            {
221                fetch_text_web(url).await
222            }
223        })
224    }
225
226    fn get_bytes<'a>(&'a self, url: &'a str) -> HttpFuture<'a, Vec<u8>> {
227        Box::pin(async move {
228            #[cfg(not(target_arch = "wasm32"))]
229            {
230                self.fetch_bytes_native(url)
231            }
232
233            #[cfg(target_arch = "wasm32")]
234            {
235                fetch_bytes_web(url).await
236            }
237        })
238    }
239}
240
241#[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
242fn build_native_client() -> Result<reqwest::blocking::Client, HttpError> {
243    use std::time::Duration;
244
245    configure_native_client_builder(
246        reqwest::blocking::Client::builder()
247            .timeout(Duration::from_secs(10))
248            .user_agent("cranpose/0.1"),
249    )?
250    .build()
251    .map_err(|err| HttpError::ClientInit(err.to_string()))
252}
253
254#[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
255fn configure_native_client_builder(
256    builder: reqwest::blocking::ClientBuilder,
257) -> Result<reqwest::blocking::ClientBuilder, HttpError> {
258    #[cfg(target_os = "android")]
259    {
260        return Ok(builder.tls_certs_only(android_root_certificates()?));
261    }
262
263    #[cfg(not(target_os = "android"))]
264    {
265        Ok(builder)
266    }
267}
268
269#[cfg(all(target_os = "android", feature = "http-native"))]
270fn android_root_certificates() -> Result<Vec<reqwest::Certificate>, HttpError> {
271    certificates_from_der_chain(
272        webpki_root_certs::TLS_SERVER_ROOT_CERTS
273            .iter()
274            .map(|certificate| certificate.as_ref()),
275    )
276}
277
278#[cfg(any(
279    all(test, not(target_arch = "wasm32"), feature = "http-native"),
280    all(target_os = "android", feature = "http-native")
281))]
282fn certificates_from_der_chain<'a, I>(
283    certificates: I,
284) -> Result<Vec<reqwest::Certificate>, HttpError>
285where
286    I: IntoIterator<Item = &'a [u8]>,
287{
288    certificates
289        .into_iter()
290        .enumerate()
291        .map(|(index, der)| {
292            reqwest::Certificate::from_der(der).map_err(|err| {
293                HttpError::ClientInit(format!(
294                    "Failed to load TLS root certificate {index}: {err}"
295                ))
296            })
297        })
298        .collect()
299}
300
301#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
302async fn fetch_text_web(url: &str) -> Result<String, HttpError> {
303    use wasm_bindgen::JsCast;
304    use wasm_bindgen_futures::JsFuture;
305    use web_sys::{Request, RequestInit, RequestMode, Response};
306
307    let opts = RequestInit::new();
308    opts.set_method("GET");
309    opts.set_mode(RequestMode::Cors);
310
311    let request =
312        Request::new_with_str_and_init(url, &opts).map_err(|err| HttpError::RequestFailed {
313            url: url.to_string(),
314            message: format!("{:?}", err),
315        })?;
316
317    let window = web_sys::window().ok_or(HttpError::NoWindow)?;
318    let resp_value = JsFuture::from(window.fetch_with_request(&request))
319        .await
320        .map_err(|err| HttpError::RequestFailed {
321            url: url.to_string(),
322            message: format!("{:?}", err),
323        })?;
324
325    let resp: Response = resp_value
326        .dyn_into()
327        .map_err(|_| HttpError::InvalidResponse {
328            url: url.to_string(),
329            message: "Response is not a Response object".to_string(),
330        })?;
331
332    if !resp.ok() {
333        return Err(HttpError::HttpStatus {
334            url: url.to_string(),
335            status: resp.status(),
336        });
337    }
338
339    let text_promise = resp.text().map_err(|err| HttpError::BodyReadFailed {
340        url: url.to_string(),
341        message: format!("{:?}", err),
342    })?;
343    let text_value =
344        JsFuture::from(text_promise)
345            .await
346            .map_err(|err| HttpError::BodyReadFailed {
347                url: url.to_string(),
348                message: format!("{:?}", err),
349            })?;
350
351    text_value
352        .as_string()
353        .ok_or_else(|| HttpError::InvalidResponse {
354            url: url.to_string(),
355            message: "Response body is not a string".to_string(),
356        })
357}
358
359#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
360async fn fetch_text_web(_url: &str) -> Result<String, HttpError> {
361    Err(HttpError::UnsupportedFeature {
362        operation: "web HTTP text requests",
363        feature: "web-http",
364    })
365}
366
367#[cfg(all(target_arch = "wasm32", feature = "web-http"))]
368async fn fetch_bytes_web(url: &str) -> Result<Vec<u8>, HttpError> {
369    use wasm_bindgen::JsCast;
370    use wasm_bindgen_futures::JsFuture;
371    use web_sys::{Request, RequestInit, RequestMode, Response};
372
373    let opts = RequestInit::new();
374    opts.set_method("GET");
375    opts.set_mode(RequestMode::Cors);
376
377    let request =
378        Request::new_with_str_and_init(url, &opts).map_err(|err| HttpError::RequestFailed {
379            url: url.to_string(),
380            message: format!("{:?}", err),
381        })?;
382
383    let window = web_sys::window().ok_or(HttpError::NoWindow)?;
384    let resp_value = JsFuture::from(window.fetch_with_request(&request))
385        .await
386        .map_err(|err| HttpError::RequestFailed {
387            url: url.to_string(),
388            message: format!("{:?}", err),
389        })?;
390
391    let resp: Response = resp_value
392        .dyn_into()
393        .map_err(|_| HttpError::InvalidResponse {
394            url: url.to_string(),
395            message: "Response is not a Response object".to_string(),
396        })?;
397
398    if !resp.ok() {
399        return Err(HttpError::HttpStatus {
400            url: url.to_string(),
401            status: resp.status(),
402        });
403    }
404
405    let bytes_promise = resp
406        .array_buffer()
407        .map_err(|err| HttpError::BodyReadFailed {
408            url: url.to_string(),
409            message: format!("{:?}", err),
410        })?;
411    let bytes_value =
412        JsFuture::from(bytes_promise)
413            .await
414            .map_err(|err| HttpError::BodyReadFailed {
415                url: url.to_string(),
416                message: format!("{:?}", err),
417            })?;
418
419    let array = js_sys::Uint8Array::new(&bytes_value);
420    Ok(array.to_vec())
421}
422
423#[cfg(all(target_arch = "wasm32", not(feature = "web-http")))]
424async fn fetch_bytes_web(_url: &str) -> Result<Vec<u8>, HttpError> {
425    Err(HttpError::UnsupportedFeature {
426        operation: "web HTTP byte requests",
427        feature: "web-http",
428    })
429}
430
431pub fn default_http_client() -> HttpClientRef {
432    Arc::new(DefaultHttpClient::new())
433}
434
435pub fn local_http_client() -> CompositionLocal<HttpClientRef> {
436    thread_local! {
437        static LOCAL_HTTP_CLIENT: std::cell::RefCell<Option<CompositionLocal<HttpClientRef>>> = const { std::cell::RefCell::new(None) };
438    }
439
440    LOCAL_HTTP_CLIENT.with(|cell| {
441        let mut local = cell.borrow_mut();
442        local
443            .get_or_insert_with(|| compositionLocalOfWithPolicy(default_http_client, Arc::ptr_eq))
444            .clone()
445    })
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use crate::run_test_composition;
452    use cranpose_core::CompositionLocalProvider;
453    use std::cell::RefCell;
454    use std::rc::Rc;
455    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
456    use std::thread;
457
458    struct TestHttpClient;
459
460    impl HttpClient for TestHttpClient {
461        fn get_text<'a>(&'a self, _url: &'a str) -> HttpFuture<'a, String> {
462            Box::pin(async { Ok("ok".to_string()) })
463        }
464    }
465
466    #[test]
467    fn default_http_client_is_available() {
468        let client = default_http_client();
469        let cloned = client.clone();
470        assert_eq!(Arc::strong_count(&client), 2);
471        drop(cloned);
472        assert_eq!(Arc::strong_count(&client), 1);
473    }
474
475    #[test]
476    fn default_http_client_has_no_process_global_native_client_cache() {
477        let source = include_str!("http.rs");
478        let once_lock = ["Once", "Lock"].concat();
479        let static_client = ["static ", "CLIENT"].concat();
480        let native_client_fn = ["fn ", "native_client()"].concat();
481
482        assert!(
483            !source.contains(&static_client)
484                && !source.contains(&native_client_fn)
485                && !source.contains(&once_lock),
486            "native HTTP client state must be owned by DefaultHttpClient instead of a process-global cache"
487        );
488    }
489
490    #[test]
491    fn test_client_uses_default_get_bytes_from_text() {
492        let client = TestHttpClient;
493        let bytes = pollster::block_on(client.get_bytes("https://example.com")).expect("bytes");
494        assert_eq!(bytes, b"ok".to_vec());
495    }
496
497    #[test]
498    fn map_ordered_concurrent_preserves_input_order() {
499        let inputs = [3usize, 1, 4, 1, 5];
500        let outputs = pollster::block_on(map_ordered_concurrent(&inputs, 2, |value| async move {
501            value * 10
502        }))
503        .expect("ordered concurrent mapping");
504
505        assert_eq!(outputs, vec![30, 10, 40, 10, 50]);
506    }
507
508    #[test]
509    fn native_ordered_concurrency_is_not_tied_to_native_http_client_feature() {
510        let source = include_str!("http.rs");
511        assert!(
512            source.contains(
513                "#[cfg(not(target_arch = \"wasm32\"))]\npub async fn map_ordered_concurrent"
514            ),
515            "native ordered concurrency must stay available without the HTTP client feature"
516        );
517        assert!(
518            !source.contains(
519                "all(not(target_arch = \"wasm32\"), feature = \"http-native\")]\npub async fn map_ordered_concurrent"
520            ) && !source.contains(
521                "all(not(target_arch = \"wasm32\"), not(feature = \"http-native\"))]\npub async fn map_ordered_concurrent"
522            ),
523            "native ordered concurrency must not split into HTTP-enabled and HTTP-disabled behavior"
524        );
525    }
526
527    #[cfg(not(target_arch = "wasm32"))]
528    #[test]
529    fn map_ordered_concurrent_reports_worker_panic() {
530        let inputs = [1usize];
531        let should_panic = Arc::new(std::sync::atomic::AtomicBool::new(true));
532        let should_panic_for_task = Arc::clone(&should_panic);
533
534        let error = pollster::block_on(map_ordered_concurrent(&inputs, 1, move |_| {
535            let should_panic = Arc::clone(&should_panic_for_task);
536            async move {
537                if should_panic.load(std::sync::atomic::Ordering::SeqCst) {
538                    panic!("test worker panic");
539                }
540                1usize
541            }
542        }))
543        .expect_err("worker panic should be reported");
544
545        assert!(matches!(error, HttpError::WorkerPanicked { .. }));
546    }
547
548    #[test]
549    fn local_http_client_can_be_overridden() {
550        let local = local_http_client();
551        let default_client = default_http_client();
552        let custom_client: HttpClientRef = Arc::new(TestHttpClient);
553        let captured = Rc::new(RefCell::new(None));
554
555        {
556            let captured_for_closure = Rc::clone(&captured);
557            let custom_client = custom_client.clone();
558            let local_for_provider = local.clone();
559            let local_for_read = local.clone();
560            run_test_composition(move || {
561                let captured = Rc::clone(&captured_for_closure);
562                let local_for_read = local_for_read.clone();
563                CompositionLocalProvider(
564                    vec![local_for_provider.provides(custom_client.clone())],
565                    move || {
566                        let current = local_for_read.current();
567                        *captured.borrow_mut() = Some(current);
568                    },
569                );
570            });
571        }
572
573        let current = captured.borrow().as_ref().expect("client captured").clone();
574        assert!(Arc::ptr_eq(&current, &custom_client));
575        assert!(!Arc::ptr_eq(&current, &default_client));
576    }
577
578    #[cfg(all(not(target_arch = "wasm32"), not(feature = "http-native")))]
579    #[test]
580    fn default_http_client_reports_disabled_native_http_feature() {
581        let error = pollster::block_on(default_http_client().get_text("https://example.com"))
582            .expect_err("native HTTP should be feature-gated");
583
584        assert!(matches!(
585            error,
586            HttpError::UnsupportedFeature {
587                feature: "http-native",
588                ..
589            }
590        ));
591    }
592
593    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
594    #[test]
595    fn native_http_client_builds() {
596        build_native_client().expect("native HTTP client should initialize");
597    }
598
599    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
600    #[test]
601    fn certificates_from_der_chain_accepts_valid_roots() {
602        let certificates = certificates_from_der_chain(
603            webpki_root_certs::TLS_SERVER_ROOT_CERTS
604                .iter()
605                .take(3)
606                .map(|certificate| certificate.as_ref()),
607        )
608        .expect("root certificates should parse");
609
610        assert_eq!(certificates.len(), 3);
611    }
612
613    #[cfg(all(not(target_arch = "wasm32"), feature = "http-native"))]
614    #[test]
615    fn default_http_client_fetches_text_from_local_server() {
616        use std::io::{Read, Write};
617        use std::net::TcpListener;
618
619        let listener = match TcpListener::bind("127.0.0.1:0") {
620            Ok(listener) => listener,
621            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
622                eprintln!("skipping local HTTP server bind in restricted test environment: {err}");
623                return;
624            }
625            Err(err) => panic!("bind local test server: {err}"),
626        };
627        let address = listener
628            .local_addr()
629            .expect("read local test server address");
630        let server = thread::spawn(move || {
631            let (mut stream, _) = listener.accept().expect("accept local test request");
632            let mut request = [0_u8; 1024];
633            let _ = stream.read(&mut request).expect("read local test request");
634            let body = "cranpose-http-test";
635            write!(
636                stream,
637                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
638                body.len(),
639                body
640            )
641            .expect("write local test response");
642        });
643
644        let url = format!("http://{address}");
645        let text = pollster::block_on(default_http_client().get_text(&url))
646            .expect("fetch text from local test server");
647        server.join().expect("join local test server");
648
649        assert_eq!(text, "cranpose-http-test");
650    }
651}